| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.gearpump.akkastream.scaladsl |
| |
| import akka.stream.Attributes |
| import org.apache.gearpump.akkastream.module._ |
| import akka.stream.scaladsl.{Flow, Keep, Sink, Source} |
| import org.apache.gearpump.cluster.UserConfig |
| import org.apache.gearpump.streaming.sink.DataSink |
| import org.apache.gearpump.streaming.source.DataSource |
| import org.apache.gearpump.streaming.task.Task |
| import org.reactivestreams.{Publisher, Subscriber} |
| |
| |
| object GearSource{ |
| |
| /** |
| * Construct a Source which accepts out of band input messages. |
| * |
| * [[SourceBridgeModule]] -> Sink |
| * / |
| * / |
| * V |
| * materialize to [[Subscriber]] |
| * /| |
| * / |
| * upstream [[Publisher]] send out of band message |
| * |
| */ |
| def bridge[IN, OUT]: Source[OUT, Subscriber[IN]] = { |
| val source = new Source(new DummySource[IN](Attributes.name("dummy"), Source.shape("dummy"))) |
| val flow = new Flow[IN, OUT, Subscriber[IN]](new SourceBridgeModule[IN, OUT]()) |
| source.viaMat(flow)(Keep.right) |
| } |
| |
| /** |
| * Construct a Source from Gearpump [[DataSource]]. |
| * |
| * [[SourceTaskModule]] -> downstream Sink |
| * |
| */ |
| def from[OUT](source: DataSource): Source[OUT, Unit] = { |
| val taskSource = new Source[OUT, Unit](SourceTaskModule(source, UserConfig.empty)) |
| taskSource |
| } |
| |
| /** |
| * Construct a Source from Gearpump [[org.apache.gearpump.streaming.Processor]]. |
| * |
| * [[ProcessorModule]] -> downstream Sink |
| * |
| */ |
| def from[OUT](processor: Class[_ <: Task], conf: UserConfig): Source[OUT, Unit] = { |
| val source = new Source(new DummySource[Unit](Attributes.name("dummy"), Source.shape("dummy"))) |
| val flow = Processor.apply[Unit, OUT](processor, conf) |
| source.viaMat(flow)(Keep.right) |
| } |
| } |
| |
| object GearSink { |
| |
| /** |
| * Construct a Sink which output messages to a out of band channel. |
| * |
| * Souce -> [[SinkBridgeModule]] |
| * \ |
| * \| |
| * materialize to [[Publisher]] |
| * \ |
| * \ |
| * \| |
| * send out of band message to downstream [[Subscriber]] |
| * |
| */ |
| def bridge[IN, OUT]: Sink[IN, Publisher[OUT]] = { |
| val sink = new Sink(new DummySink[OUT](Attributes.name("dummy"), Sink.shape("dummy"))) |
| val flow = new Flow[IN, OUT, Publisher[OUT]](new SinkBridgeModule[IN, OUT]()) |
| flow.to(sink) |
| } |
| |
| /** |
| * Construct a Sink from Gearpump [[DataSink]]. |
| * |
| * Upstream Source -> [[SinkTaskModule]] |
| * |
| */ |
| def to[IN](sink: DataSink): Sink[IN, Unit] = { |
| val taskSink = new Sink[IN, Unit](new SinkTaskModule(sink, UserConfig.empty)) |
| taskSink |
| } |
| |
| /** |
| * Construct a Sink from Gearpump [[org.apache.gearpump.streaming.Processor]]. |
| * |
| * Upstream Source -> [[ProcessorModule]] |
| * |
| */ |
| def to[IN](processor: Class[_ <: Task], conf: UserConfig): Sink[IN, Unit] = { |
| val sink = new Sink(new DummySink[Unit](Attributes.name("dummy"), Sink.shape("dummy"))) |
| val flow = Processor.apply[IN, Unit](processor, conf) |
| flow.to(sink) |
| } |
| } |
| |
| /** |
| * |
| * GroupBy will divide the main input stream to a set of sub-streams. |
| * This is a work-around to bypass the limitation of official API Flow.groupBy |
| * |
| * |
| * For example, to do a word count, we can write code like this: |
| * |
| * case class KV(key: String, value: String) |
| * case class Count(key: String, count: Int) |
| * |
| * val flow: Flow[KV] = GroupBy[KV](foo).map{ kv => |
| * Count(kv.key, 1) |
| * }.fold(Count(null, 0)) {(base, add) => |
| * Count(add.key, base.count + add.count) |
| * }.log("count of current key") |
| * .flatten() |
| * .to(sink) |
| * |
| * map, fold will transform data on all sub-streams, If there are 10 groups, |
| * then there will be 10 sub-streams, and for each sub-stream, there will be |
| * a map and fold. |
| * |
| * flatten will collect all sub-stream into the main stream, |
| * |
| * sink will only operate on the main stream. |
| * |
| */ |
| object GroupBy{ |
| def apply[T, Group](groupBy: T => Group): Flow[T, T, Unit] = { |
| new Flow[T, T, Unit](new GroupByModule(groupBy)) |
| } |
| } |
| |
| /** |
| * Aggregate on the data. |
| * |
| * val flow = Reduce({(a: Int, b: Int) => a + b}) |
| * |
| * |
| */ |
| object Reduce{ |
| def apply[T](reduce: (T, T) => T): Flow[T, T, Unit] = { |
| new Flow[T, T, Unit](new ReduceModule(reduce)) |
| } |
| } |
| |
| |
| /** |
| * Create a Flow by providing a Gearpump Processor class and configuration |
| * |
| * |
| */ |
| object Processor{ |
| def apply[In, Out](processor: Class[_ <: Task], conf: UserConfig): Flow[In, Out, Unit] = { |
| new Flow[In, Out, Unit](new ProcessorModule[In, Out, Unit](processor, conf)) |
| } |
| } |
| |
| object Implicits { |
| |
| /** |
| * Help util to support reduce and groupBy |
| */ |
| implicit class SourceOps[T, Mat](source: Source[T, Mat]) { |
| |
| // TODO It is named as groupBy2 to avoid conflict with built-in |
| // groupBy. Eventually, we think the built-in groupBy should |
| // be replace with this implementation. |
| def groupBy2[Group](groupBy: T => Group): Source[T, Mat] = { |
| val stage = GroupBy.apply(groupBy) |
| source.via[T, Unit](stage) |
| } |
| |
| |
| def reduce(reduce: (T, T) => T): Source[T, Mat] = { |
| val stage = Reduce.apply(reduce) |
| source.via[T, Unit](stage) |
| } |
| |
| def process[R](processor: Class[_ <: Task], conf: UserConfig): Source[R, Mat] = { |
| val stage = Processor.apply[T, R](processor, conf) |
| source.via(stage) |
| } |
| } |
| |
| /** |
| * Help util to support reduce and groupBy |
| */ |
| implicit class FlowOps[IN, OUT, Mat](flow: Flow[IN, OUT, Mat]) { |
| def groupBy2[Group](groupBy: OUT => Group): Flow[IN, OUT, Mat] = { |
| val stage = GroupBy.apply(groupBy) |
| flow.via(stage) |
| } |
| |
| def reduce(reduce: (OUT, OUT) => OUT): Flow[IN, OUT, Mat] = { |
| val stage = Reduce.apply(reduce) |
| flow.via(stage) |
| } |
| |
| def process[R](processor: Class[_ <: Task], conf: UserConfig): Flow[IN, R, Mat] = { |
| val stage = Processor.apply[OUT, R](processor, conf) |
| flow.via(stage) |
| } |
| } |
| |
| /** |
| * Help util to support groupByKey and sum |
| */ |
| implicit class KVSourceOps[K, V, Mat](source: Source[(K, V), Mat]) { |
| |
| /** |
| * if it is a KV Pair, we can group the KV pair by the key. |
| * @return |
| */ |
| def groupByKey: Source[(K, V), Mat] = { |
| val stage = GroupBy.apply(getTupleKey[K, V]) |
| source.via(stage) |
| } |
| |
| /** |
| * do sum on values |
| * |
| * Before doing this, you need to do groupByKey to group same key together |
| * , otherwise, it will do the sum no matter what current key is. |
| * |
| * @param numeric Numeric[V] |
| * @return |
| */ |
| def sumOnValue(implicit numeric: Numeric[V]): Source[(K, V), Mat] = { |
| val stage = Reduce.apply(sumByKey[K, V](numeric)) |
| source.via(stage) |
| } |
| } |
| |
| /** |
| * Help util to support groupByKey and sum |
| */ |
| implicit class KVFlowOps[K, V, Mat](flow: Flow[(K, V), (K, V), Mat]) { |
| |
| /** |
| * if it is a KV Pair, we can group the KV pair by the key. |
| * @return |
| */ |
| def groupByKey: Flow[(K, V), (K, V), Mat] = { |
| val stage = GroupBy.apply(getTupleKey[K, V]) |
| flow.via(stage) |
| } |
| |
| /** |
| * do sum on values |
| * |
| * Before doing this, you need to do groupByKey to group same key together |
| * , otherwise, it will do the sum no matter what current key is. |
| * |
| * @param numeric Numeric[V] |
| * @return |
| */ |
| def sumOnValue(implicit numeric: Numeric[V]): Flow[(K, V), (K, V), Mat] = { |
| val stage = Reduce.apply(sumByKey[K, V](numeric)) |
| flow.via(stage) |
| } |
| } |
| |
| private def getTupleKey[K, V](tuple: Tuple2[K, V]): K = tuple._1 |
| |
| private def sumByKey[K, V](numeric: Numeric[V]): (Tuple2[K, V], Tuple2[K, V]) => Tuple2[K, V] = |
| (tuple1, tuple2) => Tuple2(tuple1._1, numeric.plus(tuple1._2, tuple2._2)) |
| } |