[GEARPUMP-262] Add setup and teardown to user defined functions
Author: manuzhang <owenzhang1990@gmail.com>
Closes #131 from manuzhang/setup_teardown.
diff --git a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
index 49d3619..cbfe57a 100644
--- a/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
+++ b/examples/streaming/kafka/src/main/scala/org/apache/gearpump/streaming/examples/kafka/dsl/KafkaReadWrite.scala
@@ -21,8 +21,8 @@
import java.util.Properties
import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.cluster.main.{CLIOption, ArgumentsParser}
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
import org.apache.gearpump.streaming.kafka.KafkaStoreFactory
import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL
import org.apache.gearpump.streaming.kafka.dsl.KafkaDSL._
diff --git a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
index d4866ed..2942861 100644
--- a/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
+++ b/examples/streaming/wordcount-java/src/main/java/org/apache/gearpump/streaming/examples/wordcountjava/dsl/WordCount.java
@@ -25,12 +25,17 @@
import org.apache.gearpump.cluster.client.ClientContext;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStream;
import org.apache.gearpump.streaming.dsl.javaapi.JavaStreamApp;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction;
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.FlatMapFunction;
+import org.apache.gearpump.streaming.dsl.javaapi.functions.GroupByFunction;
import org.apache.gearpump.streaming.source.DataSource;
import org.apache.gearpump.streaming.task.TaskContext;
import scala.Tuple2;
import java.time.Instant;
import java.util.Arrays;
+import java.util.Iterator;
/** Java version of WordCount with high level DSL API */
public class WordCount {
@@ -46,15 +51,13 @@
JavaStream<String> sentence = app.source(new StringSource("This is a good start, bingo!! bingo!!"),
1, UserConfig.empty(), "source");
- JavaStream<String> words = sentence.flatMap(s -> Arrays.asList(s.split("\\s+")).iterator(),
- "flatMap");
+ JavaStream<String> words = sentence.flatMap(new Split(), "flatMap");
- JavaStream<Tuple2<String, Integer>> ones = words.map(s -> new Tuple2<>(s, 1), "map");
+ JavaStream<Tuple2<String, Integer>> ones = words.map(new Ones(), "map");
- JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(Tuple2::_1, 1, "groupBy");
+ JavaStream<Tuple2<String, Integer>> groupedOnes = ones.groupBy(new TupleKey(), 1, "groupBy");
- JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(
- (t1, t2) -> new Tuple2<>(t1._1(), t1._2() + t2._2()), "reduce");
+ JavaStream<Tuple2<String, Integer>> wordcount = groupedOnes.reduce(new Count(), "reduce");
wordcount.log();
@@ -88,4 +91,36 @@
return Instant.now();
}
}
+
+ private static class Split extends FlatMapFunction<String, String> {
+
+ @Override
+ public Iterator<String> apply(String s) {
+ return Arrays.asList(s.split("\\s+")).iterator();
+ }
+ }
+
+ private static class Ones extends MapFunction<String, Tuple2<String, Integer>> {
+
+ @Override
+ public Tuple2<String, Integer> apply(String s) {
+ return new Tuple2<>(s, 1);
+ }
+ }
+
+ private static class Count extends ReduceFunction<Tuple2<String, Integer>> {
+
+ @Override
+ public Tuple2<String, Integer> apply(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) {
+ return new Tuple2<>(t1._1(), t1._2() + t2._2());
+ }
+ }
+
+ private static class TupleKey extends GroupByFunction<Tuple2<String, Integer>, String> {
+
+ @Override
+ public String apply(Tuple2<String, Integer> tuple) {
+ return tuple._1();
+ }
+ }
}
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
index 4f43fd4..401eac0 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WindowedWordCount.scala
@@ -22,7 +22,7 @@
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.{LoggerSink, StreamApp}
+import org.apache.gearpump.streaming.dsl.scalaapi.{LoggerSink, StreamApp}
import org.apache.gearpump.streaming.dsl.window.api.{EventTimeTrigger, FixedWindow}
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.TaskContext
diff --git a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
index 22f597c..1cbfb22 100644
--- a/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
+++ b/examples/streaming/wordcount/src/main/scala/org/apache/gearpump/streaming/examples/wordcount/dsl/WordCount.scala
@@ -20,8 +20,8 @@
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.main.{ArgumentsParser, CLIOption}
-import org.apache.gearpump.streaming.dsl.StreamApp
-import org.apache.gearpump.streaming.dsl.StreamApp._
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamApp._
import org.apache.gearpump.util.AkkaApp
/** Same WordCount with High level DSL syntax */
diff --git a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
index 2417763..22efa89 100644
--- a/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
+++ b/external/hbase/src/main/scala/org/apache/gearpump/external/hbase/dsl/HBaseDSLSink.scala
@@ -18,13 +18,10 @@
package org.apache.gearpump.external.hbase.dsl
import scala.language.implicitConversions
-
import org.apache.hadoop.conf.Configuration
-
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.external.hbase.HBaseSink
-import org.apache.gearpump.streaming.dsl.Stream
-import org.apache.gearpump.streaming.dsl.Stream.Sink
+import org.apache.gearpump.streaming.dsl.scalaapi.Stream
/** Create a HBase DSL Sink */
class HBaseDSLSink[T](stream: Stream[T]) {
diff --git a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
index f1bb26a..996ae0b 100644
--- a/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
+++ b/external/kafka/src/main/scala/org/apache/gearpump/streaming/kafka/dsl/KafkaDSL.scala
@@ -21,7 +21,7 @@
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.dsl
-import org.apache.gearpump.streaming.dsl.StreamApp
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, StreamApp}
import org.apache.gearpump.streaming.kafka.{KafkaSink, KafkaSource}
import org.apache.gearpump.streaming.transaction.api.CheckpointStoreFactory
@@ -44,7 +44,7 @@
parallelism: Int = 1,
config: UserConfig = UserConfig.empty,
description: String = "KafkaSource"
- ): dsl.Stream[T] = {
+ ): Stream[T] = {
app.source[T](new KafkaSource(topics, properties), parallelism, config, description)
}
@@ -66,19 +66,19 @@
properties: Properties,
parallelism: Int = 1,
config: UserConfig = UserConfig.empty,
- description: String = "KafkaSource"): dsl.Stream[T] = {
+ description: String = "KafkaSource"): Stream[T] = {
val source = new KafkaSource(topics, properties)
source.setCheckpointStore(checkpointStoreFactory)
app.source[T](source, parallelism, config, description)
}
import scala.language.implicitConversions
- implicit def streamToKafkaDSL[T](stream: dsl.Stream[T]): KafkaDSL[T] = {
+ implicit def streamToKafkaDSL[T](stream: Stream[T]): KafkaDSL[T] = {
new KafkaDSL[T](stream)
}
}
-class KafkaDSL[T](stream: dsl.Stream[T]) {
+class KafkaDSL[T](stream: Stream[T]) {
/**
* Sinks data to Kafka
@@ -94,7 +94,7 @@
properties: Properties,
parallelism: Int = 1,
userConfig: UserConfig = UserConfig.empty,
- description: String = "KafkaSink"): dsl.Stream[T] = {
+ description: String = "KafkaSink"): Stream[T] = {
stream.sink(new KafkaSink(topic, properties), parallelism, userConfig, description)
}
}
\ No newline at end of file
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
deleted file mode 100644
index 9788dd2..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FlatMapFunction.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-import java.util.Iterator;
-
-/**
- * Function that converts a value of type T to a iterator of values of type R.
- *
- * @param <T> Input value type
- * @param <R> Return value type
- */
-public interface FlatMapFunction<T, R> extends Serializable {
- Iterator<R> apply(T t);
-}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
deleted file mode 100644
index 6c71280..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/GroupByFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * GroupBy function which assign value of type T to groups
- *
- * @param <T> Input value type
- * @param <Group> Group Type
- */
-public interface GroupByFunction<T, Group> extends Serializable {
- Group apply(T t);
-}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
deleted file mode 100644
index e1fc821..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/MapFunction.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that map a value of type T to value of type R
- *
- * @param <T> Input value type
- * @param <R> Output value type
- */
-public interface MapFunction<T, R> extends Serializable {
- R apply(T t);
-}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java b/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
deleted file mode 100644
index 2bcac60..0000000
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/ReduceFunction.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
-
-/**
- * Function that applies reduce operation
- *
- * @param <T> Input value type
- */
-public interface ReduceFunction<T> extends Serializable {
- T apply(T t1, T t2);
-}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
similarity index 61%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
index f07ceff..e4e7309 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/FilterFunction.scala
@@ -15,16 +15,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.gearpump.streaming.dsl.api.functions
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-import java.io.Serializable;
+object FilterFunction {
+
+ def apply[T](fn: T => Boolean): FilterFunction[T] = {
+ new FilterFunction[T] {
+ override def apply(t: T): Boolean = {
+ fn(t)
+ }
+ }
+ }
+}
/**
- * Filter function
+ * Returns true to keep the input and false otherwise.
*
- * @param <T> Message of type T
+ * @param T Input value type
*/
-public interface FilterFunction<T> extends Serializable {
- boolean apply(T t);
+abstract class FilterFunction[T] extends SerializableFunction {
+
+ def apply(t: T): Boolean
+
}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
similarity index 60%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
index f07ceff..70fe9d4 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/MapFunction.scala
@@ -15,16 +15,29 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.gearpump.streaming.dsl.api.functions
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-import java.io.Serializable;
+object MapFunction {
+
+ def apply[T, R](fn: T => R): MapFunction[T, R] = {
+ new MapFunction[T, R] {
+ override def apply(t: T): R = {
+ fn(t)
+ }
+ }
+ }
+}
/**
- * Filter function
+ * Transforms an input into an output of possibly different types.
*
- * @param <T> Message of type T
+ * @param T Input value type
+ * @param R Output value type
*/
-public interface FilterFunction<T> extends Serializable {
- boolean apply(T t);
+abstract class MapFunction[T, R] extends SerializableFunction {
+
+ def apply(t: T): R
+
}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
similarity index 60%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
index f07ceff..25b12be 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/api/functions/ReduceFunction.scala
@@ -15,16 +15,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.gearpump.streaming.dsl.api.functions
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
-import java.io.Serializable;
+object ReduceFunction {
+
+ def apply[T](fn: (T, T) => T): ReduceFunction[T] = {
+ new ReduceFunction[T] {
+ override def apply(t1: T, t2: T): T = {
+ fn(t1, t2)
+ }
+ }
+ }
+}
/**
- * Filter function
+ * Combines two inputs into one output of the same type.
*
- * @param <T> Message of type T
+ * @param T Type of both inputs and output
*/
-public interface FilterFunction<T> extends Serializable {
- boolean apply(T t);
+abstract class ReduceFunction[T] extends SerializableFunction {
+
+ def apply(t1: T, t2: T): T
+
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
index f2654ea..7f3c250 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStream.scala
@@ -15,14 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.gearpump.streaming.dsl.javaapi
-import scala.collection.JavaConverters._
import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction, GroupByFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.{Stream, WindowStream}
import org.apache.gearpump.streaming.dsl.window.api.Window
-import org.apache.gearpump.streaming.dsl.{Stream, WindowStream}
-import org.apache.gearpump.streaming.javaapi.dsl.functions._
import org.apache.gearpump.streaming.task.Task
/**
@@ -31,23 +31,23 @@
class JavaStream[T](val stream: Stream[T]) {
/** FlatMap on stream */
- def flatMap[R](fn: FlatMapFunction[T, R], description: String): JavaStream[R] = {
- new JavaStream[R](stream.flatMap({ t: T => fn(t).asScala }, description))
+ def flatMap[R](fn: JFlatMapFunction[T, R], description: String): JavaStream[R] = {
+ new JavaStream[R](stream.flatMap(FlatMapFunction(fn), "flatMap"))
}
/** Map on stream */
def map[R](fn: MapFunction[T, R], description: String): JavaStream[R] = {
- new JavaStream[R](stream.map({ t: T => fn(t) }, description))
+ new JavaStream[R](stream.flatMap(FlatMapFunction(fn), description))
}
/** Only keep the messages that FilterFunction returns true. */
def filter(fn: FilterFunction[T], description: String): JavaStream[T] = {
- new JavaStream[T](stream.filter({ t: T => fn(t) }, description))
+ new JavaStream[T](stream.flatMap(FlatMapFunction(fn), description))
}
/** Does aggregation on the stream */
def reduce(fn: ReduceFunction[T], description: String): JavaStream[T] = {
- new JavaStream[T](stream.reduce({ (t1: T, t2: T) => fn(t1, t2) }, description))
+ new JavaStream[T](stream.reduce(fn, description))
}
def log(): Unit = {
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
index 82a284e..b8d1f4c 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/JavaStreamApp.scala
@@ -19,13 +19,14 @@
package org.apache.gearpump.streaming.dsl.javaapi
import java.util.Collection
-import scala.collection.JavaConverters._
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
-import org.apache.gearpump.streaming.dsl.{CollectionDataSource, StreamApp}
+import org.apache.gearpump.streaming.dsl.scalaapi.{CollectionDataSource, StreamApp}
import org.apache.gearpump.streaming.source.DataSource
+import scala.collection.JavaConverters._
+
class JavaStreamApp(name: String, context: ClientContext, userConfig: UserConfig) {
private val streamApp = StreamApp(name, context, userConfig)
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
similarity index 63%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
index f07ceff..85d597d 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/FlatMapFunction.scala
@@ -15,16 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.gearpump.streaming.dsl.javaapi.functions
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.SerializableFunction
/**
- * Filter function
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Java version of FlatMapFunction returns a java.util.Iterator.
*
- * @param <T> Message of type T
+ * @param T Input value type
+ * @param R Output value type
*/
-public interface FilterFunction<T> extends Serializable {
- boolean apply(T t);
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+ def apply(t: T): java.util.Iterator[R]
}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
similarity index 72%
rename from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
index f07ceff..7656cba 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/javaapi/functions/GroupByFunction.scala
@@ -15,16 +15,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+package org.apache.gearpump.streaming.dsl.javaapi.functions
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
+import org.apache.gearpump.streaming.dsl.api.functions.MapFunction
/**
- * Filter function
+ * Assigns the input value into a group.
*
- * @param <T> Message of type T
+ * @param T Input value type
+ * @param GROUP Group value type
*/
-public interface FilterFunction<T> extends Serializable {
- boolean apply(T t);
-}
+abstract class GroupByFunction[T, GROUP] extends MapFunction[T, GROUP]
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
index f15d875..82ea7c7 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/OP.scala
@@ -22,7 +22,7 @@
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.streaming.Constants._
import org.apache.gearpump.streaming.Processor.DefaultProcessor
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, SingleInputFunction}
import org.apache.gearpump.streaming.{Constants, Processor}
import org.apache.gearpump.streaming.dsl.task.TransformTask
import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
@@ -134,7 +134,7 @@
other match {
case op: ChainableOp[OUT, _] =>
// TODO: preserve type info
- ChainableOp(fn.andThen(op.fn))
+ ChainableOp(AndThen(fn, op.fn))
case _ =>
throw new OpChainException(this, other)
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
index 5322648..687fd2e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunction.scala
@@ -17,23 +17,35 @@
*/
package org.apache.gearpump.streaming.dsl.plan.functions
-trait SingleInputFunction[IN, OUT] extends Serializable {
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
+
+/**
+ * Internal function to process single input
+ *
+ * @param IN input value type
+ * @param OUT output value type
+ */
+sealed trait SingleInputFunction[IN, OUT] extends java.io.Serializable {
+
+ def setup(): Unit = {}
+
def process(value: IN): TraversableOnce[OUT]
- def andThen[OUTER](other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
- AndThen(this, other)
- }
+
def finish(): TraversableOnce[OUT] = None
- def clearState(): Unit = {}
+
+ def teardown(): Unit = {}
+
def description: String
}
-case class AndThen[IN, MIDDLE, OUT](
- first: SingleInputFunction[IN, MIDDLE], second: SingleInputFunction[MIDDLE, OUT])
+case class AndThen[IN, MIDDLE, OUT](first: SingleInputFunction[IN, MIDDLE],
+ second: SingleInputFunction[MIDDLE, OUT])
extends SingleInputFunction[IN, OUT] {
- override def andThen[OUTER](
- other: SingleInputFunction[OUT, OUTER]): SingleInputFunction[IN, OUTER] = {
- first.andThen(second.andThen(other))
+ override def setup(): Unit = {
+ first.setup()
+ second.setup()
}
override def process(value: IN): TraversableOnce[OUT] = {
@@ -49,9 +61,9 @@
}
}
- override def clearState(): Unit = {
- first.clearState()
- second.clearState()
+ override def teardown(): Unit = {
+ first.teardown()
+ second.teardown()
}
override def description: String = {
@@ -61,22 +73,31 @@
}
}
-class FlatMapFunction[IN, OUT](fn: IN => TraversableOnce[OUT], descriptionMessage: String)
+class FlatMapper[IN, OUT](fn: FlatMapFunction[IN, OUT], val description: String)
extends SingleInputFunction[IN, OUT] {
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
override def process(value: IN): TraversableOnce[OUT] = {
fn(value)
}
- override def description: String = descriptionMessage
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
}
-
-class ReduceFunction[T](fn: (T, T) => T, descriptionMessage: String)
+class Reducer[T](fn: ReduceFunction[T], val description: String)
extends SingleInputFunction[T, T] {
private var state: Option[T] = None
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
override def process(value: T): TraversableOnce[T] = {
if (state.isEmpty) {
state = Option(value)
@@ -90,23 +111,18 @@
state
}
- override def clearState(): Unit = {
+ override def teardown(): Unit = {
state = None
+ fn.teardown()
}
-
- override def description: String = descriptionMessage
}
-class EmitFunction[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
+class Emit[T](emit: T => Unit) extends SingleInputFunction[T, Unit] {
override def process(value: T): TraversableOnce[Unit] = {
emit(value)
None
}
- override def andThen[R](other: SingleInputFunction[Unit, R]): SingleInputFunction[T, R] = {
- throw new UnsupportedOperationException("andThen is not supposed to be called on EmitFunction")
- }
-
override def description: String = ""
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
similarity index 72%
rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
index 440a45e..430d795 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/Stream.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/Stream.scala
@@ -16,14 +16,16 @@
* limitations under the License.
*/
-package org.apache.gearpump.streaming.dsl
+package org.apache.gearpump.streaming.dsl.scalaapi
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.plan._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions._
import org.apache.gearpump.streaming.dsl.window.api._
-import org.apache.gearpump.streaming.dsl.window.impl._
+import org.apache.gearpump.streaming.dsl.window.impl.{Bucket, GroupAlsoByWindow}
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.task.{Task, TaskContext}
import org.apache.gearpump.util.Graph
@@ -36,55 +38,95 @@
private val edge: Option[OpEdge] = None) {
/**
- * converts a value[T] to a list of value[R]
+ * Returns a new stream by applying a flatMap function to each element
+ * and flatten the results.
*
- * @param fn FlatMap function
+ * @param fn flatMap function
* @param description The description message for this operation
* @return A new stream with type [R]
*/
def flatMap[R](fn: T => TraversableOnce[R], description: String = "flatMap"): Stream[R] = {
- val flatMapOp = ChainableOp(new FlatMapFunction[T, R](fn, description))
- graph.addVertex(flatMapOp)
- graph.addEdge(thisNode, edge.getOrElse(Direct), flatMapOp)
- new Stream[R](graph, flatMapOp)
+ this.flatMap(FlatMapFunction(fn), description)
}
/**
- * Maps message of type T message of type R
+ * Returns a new stream by applying a flatMap function to each element
+ * and flatten the results.
*
- * @param fn Function
+ * @param fn flatMap function
+ * @param description The description message for this operation
+ * @return A new stream with type [R]
+ */
+ def flatMap[R](fn: FlatMapFunction[T, R], description: String): Stream[R] = {
+ transform(new FlatMapper[T, R](fn, description))
+ }
+
+ /**
+ * Returns a new stream by applying a map function to each element.
+ *
+ * @param fn map function
* @return A new stream with type [R]
*/
def map[R](fn: T => R, description: String = "map"): Stream[R] = {
- this.flatMap({ data =>
- Option(fn(data))
- }, description)
+ this.map(MapFunction(fn), description)
}
/**
- * Keeps records when fun(T) == true
+ * Returns a new stream by applying a map function to each element.
*
- * @param fn the filter
- * @return a new stream after filter
+ * @param fn map function
+ * @return A new stream with type [R]
+ */
+ def map[R](fn: MapFunction[T, R], description: String): Stream[R] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+
+ /**
+ * Returns a new Stream keeping the elements that satisfy the filter function.
+ *
+ * @param fn filter function
+ * @return a new stream after filter
*/
def filter(fn: T => Boolean, description: String = "filter"): Stream[T] = {
- this.flatMap({ data =>
- if (fn(data)) Option(data) else None
- }, description)
+ this.filter(FilterFunction(fn), description)
}
/**
- * Reduces operations.
+ * Returns a new Stream keeping the elements that satisfy the filter function.
*
- * @param fn reduction function
+ * @param fn filter function
+ * @return a new stream after filter
+ */
+ def filter(fn: FilterFunction[T], description: String): Stream[T] = {
+ this.flatMap(FlatMapFunction(fn), description)
+ }
+ /**
+ * Returns a new stream by applying a reduce function over all the elements.
+ *
+ * @param fn reduce function
* @param description description message for this operator
- * @return a new stream after reduction
+ * @return a new stream after reduce
*/
def reduce(fn: (T, T) => T, description: String = "reduce"): Stream[T] = {
- val reduceOp = ChainableOp(new ReduceFunction(fn, description))
- graph.addVertex(reduceOp)
- graph.addEdge(thisNode, edge.getOrElse(Direct), reduceOp)
- new Stream(graph, reduceOp)
+ reduce(ReduceFunction(fn), description)
+ }
+
+ /**
+ * Returns a new stream by applying a reduce function over all the elements.
+ *
+ * @param fn reduce function
+ * @param description description message for this operator
+ * @return a new stream after reduce
+ */
+ def reduce(fn: ReduceFunction[T], description: String): Stream[T] = {
+ transform(new Reducer[T](fn, description))
+ }
+
+ private def transform[R](fn: SingleInputFunction[T, R]): Stream[R] = {
+ val op = ChainableOp(fn)
+ graph.addVertex(op)
+ graph.addEdge(thisNode, edge.getOrElse(Direct), op)
+ new Stream(graph, op)
}
/**
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
similarity index 98%
rename from streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
rename to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
index 8116146..d6eed2e 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/StreamApp.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamApp.scala
@@ -16,11 +16,12 @@
* limitations under the License.
*/
-package org.apache.gearpump.streaming.dsl
+package org.apache.gearpump.streaming.dsl.scalaapi
import java.time.Instant
import akka.actor.ActorSystem
+import org.apache.gearpump.Message
import org.apache.gearpump.cluster.UserConfig
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.streaming.StreamApplication
@@ -28,7 +29,6 @@
import org.apache.gearpump.streaming.source.DataSource
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.Graph
-import org.apache.gearpump.Message
import scala.language.implicitConversions
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
new file mode 100644
index 0000000..f10a3db
--- /dev/null
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/FlatMapFunction.scala
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
+
+import org.apache.gearpump.streaming.dsl.api.functions.{FilterFunction, MapFunction}
+import org.apache.gearpump.streaming.dsl.javaapi.functions.{FlatMapFunction => JFlatMapFunction}
+
+import scala.collection.JavaConverters._
+
+object FlatMapFunction {
+
+ def apply[T, R](fn: JFlatMapFunction[T, R]): FlatMapFunction[T, R] = {
+ new FlatMapFunction[T, R] {
+
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
+ override def apply(t: T): TraversableOnce[R] = {
+ fn.apply(t).asScala
+ }
+
+
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
+ }
+ }
+
+ def apply[T, R](fn: T => TraversableOnce[R]): FlatMapFunction[T, R] = {
+ new FlatMapFunction[T, R] {
+ override def apply(t: T): TraversableOnce[R] = {
+ fn(t)
+ }
+ }
+ }
+
+ def apply[T, R](fn: MapFunction[T, R]): FlatMapFunction[T, R] = {
+ new FlatMapFunction[T, R] {
+
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
+ override def apply(t: T): TraversableOnce[R] = {
+ Option(fn(t))
+ }
+
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
+ }
+ }
+
+ def apply[T, R](fn: FilterFunction[T]): FlatMapFunction[T, T] = {
+ new FlatMapFunction[T, T] {
+
+ override def setup(): Unit = {
+ fn.setup()
+ }
+
+ override def apply(t: T): TraversableOnce[T] = {
+ if (fn(t)) {
+ Option(t)
+ } else {
+ None
+ }
+ }
+
+ override def teardown(): Unit = {
+ fn.teardown()
+ }
+ }
+ }
+}
+
+/**
+ * Transforms one input into zero or more outputs of possibly different types.
+ * This Scala version of FlatMapFunction returns a TraversableOnce.
+ *
+ * @param T Input value type
+ * @param R Output value type
+ */
+abstract class FlatMapFunction[T, R] extends SerializableFunction {
+
+ def apply(t: T): TraversableOnce[R]
+
+}
diff --git a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
similarity index 65%
copy from streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
copy to streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
index f07ceff..ab88bf1 100644
--- a/streaming/src/main/java/org/apache/gearpump/streaming/javaapi/dsl/functions/FilterFunction.java
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/scalaapi/functions/SerializableFunction.scala
@@ -15,16 +15,18 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-package org.apache.gearpump.streaming.javaapi.dsl.functions;
-
-import java.io.Serializable;
+package org.apache.gearpump.streaming.dsl.scalaapi.functions
/**
- * Filter function
- *
- * @param <T> Message of type T
+ * Superclass for all user defined function interfaces.
+ * This ensures all functions are serializable and provides common methods
+ * like setup and teardown. Users should not extend this class directly
+ * but subclasses like [[FlatMapFunction]].
*/
-public interface FilterFunction<T> extends Serializable {
- boolean apply(T t);
+abstract class SerializableFunction extends java.io.Serializable {
+
+ def setup(): Unit = {}
+
+ def teardown(): Unit = {}
+
}
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
index e35f085..c13a4fb 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/task/TransformTask.scala
@@ -23,9 +23,8 @@
import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
import org.apache.gearpump.streaming.task.{Task, TaskContext}
-class TransformTask[IN, OUT](
- operator: Option[SingleInputFunction[IN, OUT]], taskContext: TaskContext,
- userConf: UserConfig) extends Task(taskContext, userConf) {
+class TransformTask[IN, OUT](operator: Option[SingleInputFunction[IN, OUT]],
+ taskContext: TaskContext, userConf: UserConfig) extends Task(taskContext, userConf) {
def this(taskContext: TaskContext, userConf: UserConfig) = {
this(userConf.getValue[SingleInputFunction[IN, OUT]](
diff --git a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
index d87a9e4..223a4af 100644
--- a/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
+++ b/streaming/src/main/scala/org/apache/gearpump/streaming/dsl/window/impl/WindowRunner.scala
@@ -28,7 +28,7 @@
import com.gs.collections.impl.map.sorted.mutable.TreeSortedMap
import com.gs.collections.impl.set.mutable.UnifiedSet
import org.apache.gearpump.streaming.Constants._
-import org.apache.gearpump.streaming.dsl.plan.functions.{EmitFunction, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{AndThen, Emit, SingleInputFunction}
import org.apache.gearpump.streaming.dsl.window.api.Discarding
import org.apache.gearpump.streaming.task.TaskContext
import org.apache.gearpump.util.LogUtil
@@ -39,7 +39,6 @@
def process(message: Message): Unit
def trigger(time: Instant): Unit
-
}
object DefaultWindowRunner {
@@ -59,7 +58,6 @@
private val windowGroups = new UnifiedMap[WindowGroup[GROUP], FastList[IN]]
private val groupFns = new UnifiedMap[GROUP, SingleInputFunction[IN, OUT]]
-
override def process(message: Message): Unit = {
val (group, buckets) = groupBy.groupBy(message)
buckets.foreach { bucket =>
@@ -72,8 +70,11 @@
inputs.add(message.msg.asInstanceOf[IN])
windowGroups.put(wg, inputs)
}
- groupFns.putIfAbsent(group,
- userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get)
+ if (!groupFns.containsKey(group)) {
+ val fn = userConfig.getValue[SingleInputFunction[IN, OUT]](GEARPUMP_STREAMING_OPERATOR).get
+ fn.setup()
+ groupFns.put(group, fn)
+ }
}
override def trigger(time: Instant): Unit = {
@@ -88,8 +89,7 @@
wgs.forEach(new Procedure[WindowGroup[GROUP]] {
override def value(each: WindowGroup[GROUP]): Unit = {
val inputs = windowGroups.remove(each)
- val reduceFn = groupFns.get(each.group)
- .andThen[Unit](new EmitFunction[OUT](emitResult(_, time)))
+ val reduceFn = AndThen(groupFns.get(each.group), new Emit[OUT](emitResult(_, time)))
inputs.forEach(new Procedure[IN] {
override def value(t: IN): Unit = {
// .toList forces eager evaluation
@@ -99,7 +99,7 @@
// .toList forces eager evaluation
reduceFn.finish().toList
if (groupBy.window.accumulationMode == Discarding) {
- reduceFn.clearState()
+ reduceFn.teardown()
}
}
})
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
index 98bf24f..f0920de 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/OpSpec.scala
@@ -25,7 +25,8 @@
import org.apache.gearpump.streaming.Processor
import org.apache.gearpump.streaming.Processor.DefaultProcessor
import org.apache.gearpump.streaming.dsl.plan.OpSpec.{AnySink, AnySource, AnyTask}
-import org.apache.gearpump.streaming.dsl.plan.functions.SingleInputFunction
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, SingleInputFunction}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
@@ -145,7 +146,6 @@
val chainedOp = chainableOp1.chain(chainableOp2)
- verify(fn1).andThen(fn2)
chainedOp shouldBe a[ChainableOp[_, _]]
unchainableOps.foreach { op =>
@@ -156,12 +156,9 @@
}
"get Processor" in {
- val fn = new SingleInputFunction[Any, Any] {
- override def process(value: Any): TraversableOnce[Any] = null
-
- override def description: String = null
- }
- val chainableOp = ChainableOp[Any, Any](fn)
+ val fn = mock[FlatMapFunction[Any, Any]]
+ val flatMapper = new FlatMapper(fn, "flatMap")
+ val chainableOp = ChainableOp[Any, Any](flatMapper)
val processor = chainableOp.getProcessor
processor shouldBe a[Processor[_]]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
index 1610f0e..3f23fa9 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/PlannerSpec.scala
@@ -23,10 +23,12 @@
import akka.actor.ActorSystem
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
import org.apache.gearpump.streaming.partitioner.CoLocationPartitioner
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
import org.apache.gearpump.streaming.dsl.plan.PlannerSpec._
-import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapFunction, ReduceFunction}
+import org.apache.gearpump.streaming.dsl.plan.functions.{FlatMapper, Reducer}
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.window.api.GroupByFn
import org.apache.gearpump.streaming.sink.DataSink
import org.apache.gearpump.streaming.source.DataSource
@@ -56,8 +58,8 @@
val graph = Graph.empty[Op, OpEdge]
val sourceOp = DataSourceOp(new AnySource)
val groupByOp = GroupByOp(new AnyGroupByFn)
- val flatMapOp = ChainableOp[Any, Any](anyFlatMapFunction)
- val reduceOp = ChainableOp[Any, Any](anyReduceFunction)
+ val flatMapOp = ChainableOp[Any, Any](anyFlatMapper)
+ val reduceOp = ChainableOp[Any, Any](anyReducer)
val processorOp = new ProcessorOp[AnyTask]
val sinkOp = DataSinkOp(new AnySink)
val directEdge = Direct
@@ -92,9 +94,10 @@
object PlannerSpec {
private val anyParallelism = 1
- private val anyFlatMapFunction = new FlatMapFunction[Any, Any](Option(_), "flatMap")
- private val anyReduceFunction = new ReduceFunction[Any](
- (left: Any, right: Any) => (left, right), "reduce")
+ private val anyFlatMapper = new FlatMapper[Any, Any](
+ FlatMapFunction(Option(_)), "flatMap")
+ private val anyReducer = new Reducer[Any](
+ ReduceFunction((left: Any, right: Any) => (left, right)), "reduce")
class AnyTask(context: TaskContext, config: UserConfig) extends Task(context, config)
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
index ad12e33..2c03e1c 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/plan/functions/SingleInputFunctionSpec.scala
@@ -23,9 +23,11 @@
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
import org.apache.gearpump.streaming.MockUtil
-import org.apache.gearpump.streaming.dsl.CollectionDataSource
import org.apache.gearpump.streaming.source.DataSourceTask
import org.apache.gearpump.streaming.Constants._
+import org.apache.gearpump.streaming.dsl.api.functions.ReduceFunction
+import org.apache.gearpump.streaming.dsl.scalaapi.CollectionDataSource
+import org.apache.gearpump.streaming.dsl.scalaapi.functions.FlatMapFunction
import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
import org.apache.gearpump.streaming.dsl.window.api.CountWindow
import org.apache.gearpump.streaming.dsl.window.impl.GroupAlsoByWindow
@@ -77,149 +79,31 @@
andThen.finish().toList shouldBe List(secondResult)
}
- "clear both states on clearState" in {
- andThen.clearState()
+ "set up both functions on setup" in {
+ andThen.setup()
- verify(first).clearState()
- verify(second).clearState()
+ verify(first).setup()
+ verify(second).setup()
}
- "return AndThen on andThen" in {
- val third = mock[SingleInputFunction[T, Any]]
- when(second.andThen(third)).thenReturn(AndThen(second, third))
+ "tear down both functions on teardown" in {
+ andThen.teardown()
- andThen.andThen[Any](third)
-
- verify(first).andThen(AndThen(second, third))
- }
- }
-
- "FlatMapFunction" should {
-
- val flatMap = mock[R => TraversableOnce[S]]
- val flatMapFunction = new FlatMapFunction[R, S](flatMap, "flatMap")
-
- "call flatMap function when processing input value" in {
- val input = mock[R]
- flatMapFunction.process(input)
- verify(flatMap).apply(input)
+ verify(first).teardown()
+ verify(second).teardown()
}
- "return passed in description" in {
- flatMapFunction.description shouldBe "flatMap"
- }
-
- "return None on finish" in {
- flatMapFunction.finish() shouldBe List.empty[S]
- }
-
- "do nothing on clearState" in {
- flatMapFunction.clearState()
- verifyZeroInteractions(flatMap)
- }
-
- "return AndThen on andThen" in {
- val other = mock[SingleInputFunction[S, T]]
- flatMapFunction.andThen[T](other) shouldBe an [AndThen[_, _, _]]
- }
- }
-
- "ReduceFunction" should {
-
-
- "call reduce function when processing input value" in {
- val reduce = mock[(T, T) => T]
- val reduceFunction = new ReduceFunction[T](reduce, "reduce")
- val input1 = mock[T]
- val input2 = mock[T]
- val output = mock[T]
-
- when(reduce.apply(input1, input2)).thenReturn(output, output)
-
- reduceFunction.process(input1) shouldBe List.empty[T]
- reduceFunction.process(input2) shouldBe List.empty[T]
- reduceFunction.finish() shouldBe List(output)
-
- reduceFunction.clearState()
- reduceFunction.process(input1) shouldBe List.empty[T]
- reduceFunction.clearState()
- reduceFunction.process(input2) shouldBe List.empty[T]
- reduceFunction.finish() shouldBe List(input2)
- }
-
- "return passed in description" in {
- val reduce = mock[(T, T) => T]
- val reduceFunction = new ReduceFunction[T](reduce, "reduce")
- reduceFunction.description shouldBe "reduce"
- }
-
- "return None on finish" in {
- val reduce = mock[(T, T) => T]
- val reduceFunction = new ReduceFunction[T](reduce, "reduce")
- reduceFunction.finish() shouldBe List.empty[T]
- }
-
- "do nothing on clearState" in {
- val reduce = mock[(T, T) => T]
- val reduceFunction = new ReduceFunction[T](reduce, "reduce")
- reduceFunction.clearState()
- verifyZeroInteractions(reduce)
- }
-
- "return AndThen on andThen" in {
- val reduce = mock[(T, T) => T]
- val reduceFunction = new ReduceFunction[T](reduce, "reduce")
- val other = mock[SingleInputFunction[T, Any]]
- reduceFunction.andThen[Any](other) shouldBe an[AndThen[_, _, _]]
- }
- }
-
- "EmitFunction" should {
-
- val emit = mock[T => Unit]
- val emitFunction = new EmitFunction[T](emit)
-
- "emit input value when processing input value" in {
- val input = mock[T]
-
- emitFunction.process(input) shouldBe List.empty[Unit]
-
- verify(emit).apply(input)
- }
-
- "return empty description" in {
- emitFunction.description shouldBe ""
- }
-
- "return None on finish" in {
- emitFunction.finish() shouldBe List.empty[Unit]
- }
-
- "do nothing on clearState" in {
- emitFunction.clearState()
- verifyZeroInteractions(emit)
- }
-
- "throw exception on andThen" in {
- val other = mock[SingleInputFunction[Unit, Any]]
- intercept[UnsupportedOperationException] {
- emitFunction.andThen(other)
- }
- }
- }
-
- "andThen" should {
"chain multiple single input function" in {
- val split = new FlatMapFunction[String, String](line => line.split("\\s"), "split")
+ val split = new FlatMapper[String, String](FlatMapFunction(_.split("\\s")), "split")
- val filter = new FlatMapFunction[String, String](word =>
- if (word.isEmpty) None else Some(word), "filter")
+ val filter = new FlatMapper[String, String](
+ FlatMapFunction(word => if (word.isEmpty) None else Some(word)), "filter")
- val map = new FlatMapFunction[String, Int](word => Some(1), "map")
+ val map = new FlatMapper[String, Int](FlatMapFunction(word => Some(1)), "map")
- val sum = new ReduceFunction[Int]({ (left, right) => left + right }, "sum")
+ val sum = new Reducer[Int](ReduceFunction({(left, right) => left + right}), "sum")
- val all = split.andThen(filter).andThen(map).andThen(sum)
+ val all = AndThen(split, AndThen(filter, AndThen(map, sum)))
assert(all.description == "split.filter.map.sum")
@@ -239,6 +123,123 @@
}
}
+ "FlatMapper" should {
+
+ val flatMapFunction = mock[FlatMapFunction[R, S]]
+ val flatMapper = new FlatMapper[R, S](flatMapFunction, "flatMap")
+
+ "call flatMap function when processing input value" in {
+ val input = mock[R]
+ flatMapper.process(input)
+ verify(flatMapFunction).apply(input)
+ }
+
+ "return passed in description" in {
+ flatMapper.description shouldBe "flatMap"
+ }
+
+ "return None on finish" in {
+ flatMapper.finish() shouldBe List.empty[S]
+ }
+
+ "set up FlatMapFunction on setup" in {
+ flatMapper.setup()
+
+ verify(flatMapFunction).setup()
+ }
+
+ "tear down FlatMapFunction on teardown" in {
+ flatMapper.teardown()
+
+ verify(flatMapFunction).teardown()
+ }
+ }
+
+ "ReduceFunction" should {
+
+ "call reduce function when processing input value" in {
+ val reduceFunction = mock[ReduceFunction[T]]
+ val reducer = new Reducer[T](reduceFunction, "reduce")
+ val input1 = mock[T]
+ val input2 = mock[T]
+ val output = mock[T]
+
+ when(reduceFunction.apply(input1, input2)).thenReturn(output, output)
+
+ reducer.process(input1) shouldBe List.empty[T]
+ reducer.process(input2) shouldBe List.empty[T]
+ reducer.finish() shouldBe List(output)
+
+ reducer.teardown()
+ reducer.process(input1) shouldBe List.empty[T]
+ reducer.teardown()
+ reducer.process(input2) shouldBe List.empty[T]
+ reducer.finish() shouldBe List(input2)
+ }
+
+ "return passed in description" in {
+ val reduceFunction = mock[ReduceFunction[T]]
+ val reducer = new Reducer[T](reduceFunction, "reduce")
+ reducer.description shouldBe "reduce"
+ }
+
+ "return None on finish" in {
+ val reduceFunction = mock[ReduceFunction[T]]
+ val reducer = new Reducer[T](reduceFunction, "reduce")
+ reducer.finish() shouldBe List.empty[T]
+ }
+
+ "set up reduce function on setup" in {
+ val reduceFunction = mock[ReduceFunction[T]]
+ val reducer = new Reducer[T](reduceFunction, "reduce")
+ reducer.setup()
+
+ verify(reduceFunction).setup()
+ }
+
+ "tear down reduce function on teardown" in {
+ val reduceFunction = mock[ReduceFunction[T]]
+ val reducer = new Reducer[T](reduceFunction, "reduce")
+ reducer.teardown()
+
+ verify(reduceFunction).teardown()
+ }
+ }
+
+ "Emit" should {
+
+ val emitFunction = mock[T => Unit]
+ val emit = new Emit[T](emitFunction)
+
+ "emit input value when processing input value" in {
+ val input = mock[T]
+
+ emit.process(input) shouldBe List.empty[Unit]
+
+ verify(emitFunction).apply(input)
+ }
+
+ "return empty description" in {
+ emit.description shouldBe ""
+ }
+
+ "return None on finish" in {
+ emit.finish() shouldBe List.empty[Unit]
+ }
+
+ "do nothing on setup" in {
+ emit.setup()
+
+ verifyZeroInteractions(emitFunction)
+ }
+
+ "do nothing on teardown" in {
+ emit.teardown()
+
+ verifyZeroInteractions(emitFunction)
+ }
+ }
+
"Source" should {
"iterate over input source and apply attached operator" in {
@@ -261,7 +262,8 @@
// Source with transformer
val anotherTaskContext = MockUtil.mockTaskContext
- val double = new FlatMapFunction[String, String](word => List(word, word), "double")
+ val double = new FlatMapper[String, String](FlatMapFunction(
+ word => List(word, word)), "double")
val another = new DataSourceTask(anotherTaskContext,
conf.withValue(GEARPUMP_STREAMING_OPERATOR, double))
another.onStart(Instant.EPOCH)
@@ -279,9 +281,8 @@
val data = "1 2 2 3 3 3"
- val concat = new ReduceFunction[String]({ (left, right) =>
- left + right
- }, "concat")
+ val concat = new Reducer[String](ReduceFunction({ (left, right) =>
+ left + right}), "concat")
implicit val system = ActorSystem("test", TestUtil.DEFAULT_CONFIG)
val config = UserConfig.empty.withValue[SingleInputFunction[String, String]](
@@ -315,7 +316,8 @@
// Source with transformer
val taskContext = MockUtil.mockTaskContext
val conf = UserConfig.empty
- val double = new FlatMapFunction[String, String](word => List(word, word), "double")
+ val double = new FlatMapper[String, String](FlatMapFunction(
+ word => List(word, word)), "double")
val task = new TransformTask[String, String](Some(double), taskContext, conf)
task.onStart(Instant.EPOCH)
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
similarity index 91%
rename from streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
rename to streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
index db4db93..5b90a3e 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamAppSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamAppSpec.scala
@@ -16,14 +16,15 @@
* limitations under the License.
*/
-package org.apache.gearpump.streaming.dsl
+package org.apache.gearpump.streaming.dsl.scalaapi
import akka.actor.ActorSystem
import org.apache.gearpump.cluster.TestUtil
import org.apache.gearpump.cluster.client.ClientContext
+import org.apache.gearpump.streaming.dsl.scalaapi
import org.apache.gearpump.streaming.partitioner.PartitionerDescription
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
import org.apache.gearpump.streaming.source.DataSourceTask
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
import org.apache.gearpump.util.Graph
import org.mockito.Mockito.when
import org.scalatest._
@@ -49,8 +50,8 @@
when(context.system).thenReturn(system)
val dsl = StreamApp("dsl", context)
- dsl.source(List("A"), 2, "A") shouldBe a [Stream[_]]
- dsl.source(List("B"), 3, "B") shouldBe a [Stream[_]]
+ dsl.source(List("A"), 2, "A") shouldBe a [scalaapi.Stream[_]]
+ dsl.source(List("B"), 3, "B") shouldBe a [scalaapi.Stream[_]]
val application = dsl.plan()
application shouldBe a [StreamApplication]
diff --git a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
similarity index 94%
rename from streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
rename to streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
index 8def61e..62a3bcb 100644
--- a/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/StreamSpec.scala
+++ b/streaming/src/test/scala/org/apache/gearpump/streaming/dsl/scalaapi/StreamSpec.scala
@@ -16,19 +16,19 @@
* limitations under the License.
*/
-package org.apache.gearpump.streaming.dsl
+package org.apache.gearpump.streaming.dsl.scalaapi
import akka.actor._
import org.apache.gearpump.Message
import org.apache.gearpump.cluster.client.ClientContext
import org.apache.gearpump.cluster.{TestUtil, UserConfig}
-import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
-import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
-import org.apache.gearpump.streaming.dsl.StreamSpec.Join
import org.apache.gearpump.streaming.dsl.partitioner.GroupByPartitioner
+import org.apache.gearpump.streaming.dsl.scalaapi.StreamSpec.Join
import org.apache.gearpump.streaming.dsl.task.{CountTriggerTask, TransformTask}
+import org.apache.gearpump.streaming.partitioner.{CoLocationPartitioner, HashPartitioner, PartitionerDescription}
import org.apache.gearpump.streaming.source.DataSourceTask
import org.apache.gearpump.streaming.task.{Task, TaskContext}
+import org.apache.gearpump.streaming.{ProcessorDescription, StreamApplication}
import org.apache.gearpump.util.Graph
import org.apache.gearpump.util.Graph._
import org.mockito.Mockito.when
@@ -71,9 +71,10 @@
map(word => (word, 1)).
groupBy(_._1, parallelism = 2).
reduce((left, right) => (left._1, left._2 + right._2)).
- map[Either[(String, Int), String]](Left(_))
+ map[Either[(String, Int), String]]({t: (String, Int) => Left(t)})
- val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](Right(_))
+ val query = dsl.source(List("two"), 1, "").map[Either[(String, Int), String]](
+ {s: String => Right(s)})
stream.merge(query).process[(String, Int)](classOf[Join], 1)
val app: StreamApplication = dsl.plan()