| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.heron.streamlet.scala.impl |
| |
| import java.util.{Map => JMap} |
| import java.util.{HashMap => JHashMap} |
| |
| import scala.collection.JavaConverters |
| |
| import org.apache.heron.api.grouping.StreamGrouping |
| import org.apache.heron.streamlet.{ |
| IStreamletOperator, |
| JoinType, |
| KeyValue, |
| KeyedWindow, |
| SerializablePredicate, |
| Streamlet => JavaStreamlet, |
| WindowConfig |
| } |
| import org.apache.heron.streamlet.impl.{StreamletImpl => JavaStreamletImpl} |
| import org.apache.heron.streamlet.impl.streamlets.SupplierStreamlet |
| |
| import org.apache.heron.streamlet.scala.{ |
| SerializableTransformer, |
| Sink, |
| Streamlet |
| } |
| import org.apache.heron.streamlet.scala.converter.ScalaToJavaConverter._ |
| |
| object StreamletImpl { |
| |
| def fromJavaStreamlet[R](javaStreamlet: JavaStreamlet[R]): Streamlet[R] = |
| new StreamletImpl[R](javaStreamlet) |
| |
| def toJavaStreamlet[R](streamlet: Streamlet[R]): JavaStreamlet[R] = |
| streamlet.asInstanceOf[StreamletImpl[R]].javaStreamlet |
| } |
| |
| /** |
| * This class provides Scala Streamlet Implementation by wrapping Java Streamlet API. |
| * Passed User defined Scala Functions are transformed to related FunctionalInterface versions and |
| * related Java Streamlet is transformed to Scala version again. |
| */ |
| class StreamletImpl[R](val javaStreamlet: JavaStreamlet[R]) |
| extends Streamlet[R] { |
| |
| import StreamletImpl._ |
| |
| /** |
| * Sets the name of the Streamlet. |
| * |
| * @param sName The name given by the user for this Streamlet |
| * @return Returns back the Streamlet with changed name |
| */ |
| override def setName(sName: String): Streamlet[R] = |
| fromJavaStreamlet[R](javaStreamlet.setName(sName)) |
| |
| /** |
| * Gets the name of the Streamlet. |
| * |
| * @return Returns the name of the Streamlet |
| */ |
| override def getName(): String = javaStreamlet.getName |
| |
| /** |
| * Sets the number of partitions of the streamlet |
| * |
| * @param numPartitions The user assigned number of partitions |
| * @return Returns back the Streamlet with changed number of partitions |
| */ |
| override def setNumPartitions(numPartitions: Int): Streamlet[R] = |
| fromJavaStreamlet[R](javaStreamlet.setNumPartitions(numPartitions)) |
| |
| /** |
| * Gets the number of partitions of this Streamlet. |
| * |
| * @return the number of partitions of this Streamlet |
| */ |
| override def getNumPartitions(): Int = javaStreamlet.getNumPartitions |
| |
| /** |
| * Set the id of the stream to be used by the children nodes. |
| * Usage (assuming source is a Streamlet object with two output streams: stream1 and stream2): |
| * source.withStream("stream1").filter(...).log(); |
| * source.withStream("stream2").filter(...).log(); |
| * @param streamId The specified stream id |
| * @return Returns back the Streamlet with changed stream id |
| */ |
| override def withStream(streamId: String): Streamlet[R] = |
| fromJavaStreamlet[R](javaStreamlet.withStream(streamId)) |
| |
| /** |
| * Gets the stream id of this Streamlet. |
| * @return the stream id of this Streamlet |
| */ |
| override def getStreamId(): String = javaStreamlet.getStreamId |
| |
| /** |
| * Return a new Streamlet by applying mapFn to each element of this Streamlet |
| * |
| * @param mapFn The Map Function that should be applied to each element |
| */ |
| override def map[T](mapFn: R => T): Streamlet[T] = { |
| val serializableFunction = toSerializableFunction[R, T](mapFn) |
| val newJavaStreamlet = javaStreamlet.map[T](serializableFunction) |
| fromJavaStreamlet[T](newJavaStreamlet) |
| } |
| |
| /** |
| * Return a new Streamlet by applying flatMapFn to each element of this Streamlet and |
| * flattening the result |
| * |
| * @param flatMapFn The FlatMap Function that should be applied to each element |
| */ |
| override def flatMap[T](flatMapFn: R => Iterable[_ <: T]): Streamlet[T] = { |
| val serializableFunction = |
| toSerializableFunctionWithIterable[R, T](flatMapFn) |
| val newJavaStreamlet = javaStreamlet.flatMap[T](serializableFunction) |
| fromJavaStreamlet[T](newJavaStreamlet) |
| } |
| |
| /** |
| * Return a new Streamlet by applying the filterFn on each element of this streamlet |
| * and including only those elements that satisfy the filterFn |
| * |
| * @param filterFn The filter Function that should be applied to each element |
| */ |
| override def filter(filterFn: R => Boolean): Streamlet[R] = { |
| val serializablePredicate = toSerializablePredicate[R](filterFn) |
| val newJavaStreamlet = javaStreamlet.filter(serializablePredicate) |
| fromJavaStreamlet[R](newJavaStreamlet) |
| } |
| |
| /** |
| * Same as filter(filterFn).setNumPartitions(nPartitions) where filterFn is identity |
| */ |
| override def repartition(numPartitions: Int): Streamlet[R] = { |
| val newJavaStreamlet = javaStreamlet.repartition(numPartitions) |
| fromJavaStreamlet[R](newJavaStreamlet) |
| } |
| |
| /** |
| * A more generalized version of repartition where a user can determine which partitions |
| * any particular tuple should go to. For each element of the current streamlet, the user |
| * supplied partitionFn is invoked passing in the element as the first argument. The second |
| * argument is the number of partitions of the downstream streamlet. The partitionFn should |
| * return 0 or more unique numbers between 0 and n partitions to indicate which partitions |
| * this element should be routed to. |
| */ |
| override def repartition(numPartitions: Int, |
| partitionFn: (R, Int) => Seq[Int]): Streamlet[R] = { |
| val partitionFunction = toSerializableBiFunctionWithSeq[R](partitionFn) |
| val newJavaStreamlet = |
| javaStreamlet.repartition(numPartitions, partitionFunction) |
| fromJavaStreamlet[R](newJavaStreamlet) |
| } |
| |
| /** |
| * Clones the current Streamlet. It returns an array of numClones Streamlets where each |
| * Streamlet contains all the tuples of the current Streamlet |
| * |
| * @param numClones The number of clones to clone |
| */ |
| override def clone(numClones: Int): Seq[Streamlet[R]] = { |
| val javaClonedStreamlets = javaStreamlet.clone(numClones) |
| val javaClonedStreamletsAsScalaList = JavaConverters |
| .asScalaBufferConverter(javaClonedStreamlets) |
| .asScala |
| javaClonedStreamletsAsScalaList.map(streamlet => |
| fromJavaStreamlet[R](streamlet)) |
| } |
| |
| /** |
| * Return a new Streamlet by inner joining 'this streamlet with ‘other’ streamlet. |
| * The join is done over elements accumulated over a time window defined by windowCfg. |
| * The elements are compared using the thisKeyExtractor for this streamlet with the |
| * otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied. |
| * |
| * @param other The Streamlet that we are joining with. |
| * @param thisKeyExtractor The function applied to a tuple of this streamlet to get the key |
| * @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key |
| * @param windowCfg This is a specification of what kind of windowing strategy you like to |
| * have. Typical windowing strategies are sliding windows and tumbling windows |
| * @param joinFunction The join function that needs to be applied |
| */ |
| override def join[K, S, T]( |
| other: Streamlet[S], |
| thisKeyExtractor: R => K, |
| otherKeyExtractor: S => K, |
| windowCfg: WindowConfig, |
| joinFunction: (R, S) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = { |
| val javaOtherStreamlet = toJavaStreamlet[S](other) |
| val javaThisKeyExtractor = toSerializableFunction[R, K](thisKeyExtractor) |
| val javaOtherKeyExtractor = toSerializableFunction[S, K](otherKeyExtractor) |
| val javaJoinFunction = toSerializableBiFunction[R, S, T](joinFunction) |
| |
| val newJavaStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet, |
| javaThisKeyExtractor, |
| javaOtherKeyExtractor, |
| windowCfg, |
| javaJoinFunction) |
| fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet) |
| } |
| |
| /** |
| * Return a new KVStreamlet by joining 'this streamlet with ‘other’ streamlet. The type of joining |
| * is declared by the joinType parameter. |
| * The join is done over elements accumulated over a time window defined by windowCfg. |
| * The elements are compared using the thisKeyExtractor for this streamlet with the |
| * otherKeyExtractor for the other streamlet. On each matching pair, the joinFunction is applied. |
| * Types of joins {@link JoinType} |
| * |
| * @param other The Streamlet that we are joining with. |
| * @param thisKeyExtractor The function applied to a tuple of this streamlet to get the key |
| * @param otherKeyExtractor The function applied to a tuple of the other streamlet to get the key |
| * @param windowCfg This is a specification of what kind of windowing strategy you like to |
| * have. Typical windowing strategies are sliding windows and tumbling windows |
| * @param joinType Type of Join. Options { @link JoinType} |
| * @param joinFunction The join function that needs to be applied |
| */ |
| override def join[K, S, T]( |
| other: Streamlet[S], |
| thisKeyExtractor: R => K, |
| otherKeyExtractor: S => K, |
| windowCfg: WindowConfig, |
| joinType: JoinType, |
| joinFunction: (R, S) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = { |
| val javaOtherStreamlet = toJavaStreamlet[S](other) |
| val javaThisKeyExtractor = toSerializableFunction[R, K](thisKeyExtractor) |
| val javaOtherKeyExtractor = toSerializableFunction[S, K](otherKeyExtractor) |
| val javaJoinFunction = toSerializableBiFunction[R, S, T](joinFunction) |
| |
| val newJavaStreamlet = javaStreamlet.join[K, S, T](javaOtherStreamlet, |
| javaThisKeyExtractor, |
| javaOtherKeyExtractor, |
| windowCfg, |
| joinType, |
| javaJoinFunction) |
| fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet) |
| } |
| |
| /** |
| * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by |
| * windowCfg and applying reduceFn on those tuples. |
| * |
| * @param keyExtractor The function applied to a tuple of this streamlet to get the key |
| * @param valueExtractor The function applied to a tuple of this streamlet to extract the value |
| * to be reduced on |
| * @param windowCfg This is a specification of what kind of windowing strategy you like to have. |
| * Typical windowing strategies are sliding windows and tumbling windows |
| * @param reduceFn The reduce function that you want to apply to all the values of a key. |
| */ |
| override def reduceByKeyAndWindow[K, V]( |
| keyExtractor: R => K, |
| valueExtractor: R => V, |
| windowCfg: WindowConfig, |
| reduceFn: (V, V) => V): Streamlet[KeyValue[KeyedWindow[K], V]] = { |
| val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor) |
| val javaValueExtractor = toSerializableFunction[R, V](valueExtractor) |
| val javaReduceFunction = toSerializableBinaryOperator[V](reduceFn) |
| |
| val newJavaStreamlet = javaStreamlet.reduceByKeyAndWindow[K, V]( |
| javaKeyExtractor, |
| javaValueExtractor, |
| windowCfg, |
| javaReduceFunction) |
| fromJavaStreamlet[KeyValue[KeyedWindow[K], V]](newJavaStreamlet) |
| } |
| |
| /** |
| * Return a new Streamlet accumulating tuples of this streamlet over a Window defined by |
| * windowCfg and applying reduceFn on those tuples. For each window, the value identity is used |
| * as a initial value. All the matching tuples are reduced using reduceFn startin from this |
| * initial value. |
| * |
| * @param keyExtractor The function applied to a tuple of this streamlet to get the key |
| * @param windowCfg This is a specification of what kind of windowing strategy you like to have. |
| * Typical windowing strategies are sliding windows and tumbling windows |
| * @param identity The identity element is both the initial value inside the reduction window |
| * and the default result if there are no elements in the window |
| * @param reduceFn The reduce function takes two parameters: a partial result of the reduction |
| * and the next element of the stream. It returns a new partial result. |
| */ |
| override def reduceByKeyAndWindow[K, T]( |
| keyExtractor: R => K, |
| windowCfg: WindowConfig, |
| identity: T, |
| reduceFn: (T, R) => T): Streamlet[KeyValue[KeyedWindow[K], T]] = { |
| val javaKeyExtractor = toSerializableFunction[R, K](keyExtractor) |
| val javaReduceFunction = toSerializableBiFunction[T, R, T](reduceFn) |
| |
| val newJavaStreamlet = javaStreamlet.reduceByKeyAndWindow[K, T]( |
| javaKeyExtractor, |
| windowCfg, |
| identity, |
| javaReduceFunction) |
| fromJavaStreamlet[KeyValue[KeyedWindow[K], T]](newJavaStreamlet) |
| } |
| |
| /** |
| * Returns a new Streamlet that is the union of this and the ‘other’ streamlet. Essentially |
| * the new streamlet will contain tuples belonging to both Streamlets |
| */ |
| override def union(other: Streamlet[_ <: R]): Streamlet[R] = { |
| val newJavaStreamlet = javaStreamlet.union(toJavaStreamlet(other)) |
| fromJavaStreamlet(newJavaStreamlet) |
| } |
| |
| /** |
| * Returns a new Streamlet by applying the transformFunction on each element of this streamlet. |
| * Before starting to cycle the transformFunction over the Streamlet, the open function is called. |
| * This allows the transform Function to do any kind of initialization/loading, etc. |
| * |
| * @param serializableTransformer The transformation function to be applied |
| * @param < T> The return type of the transform |
| * @return Streamlet containing the output of the transformFunction |
| */ |
| override def transform[T]( |
| serializableTransformer: SerializableTransformer[R, _ <: T]) |
| : Streamlet[T] = { |
| val javaSerializableTransformer = |
| toSerializableTransformer[R, T](serializableTransformer) |
| val newJavaStreamlet = |
| javaStreamlet.transform[T](javaSerializableTransformer) |
| fromJavaStreamlet(newJavaStreamlet) |
| } |
| |
| /** |
| * Returns a new Streamlet by applying the operator on each element of this streamlet. |
| * @param operator The operator to be applied |
| * @param <T> The return type of the transform |
| * @return Streamlet containing the output of the operation |
| */ |
| override def applyOperator[T](operator: IStreamletOperator[R, T]): Streamlet[T] = { |
| val newJavaStreamlet = javaStreamlet.applyOperator[T](operator) |
| fromJavaStreamlet(newJavaStreamlet) |
| } |
| |
| /** |
| * Returns a new Streamlet by applying the operator on each element of this streamlet. |
| * @param operator The operator to be applied |
| * @param grouper The grouper to be applied with the operator |
| * @param <T> The return type of the transform |
| * @return Streamlet containing the output of the operation |
| */ |
| override def applyOperator[T](operator: IStreamletOperator[R, T], |
| grouper: StreamGrouping): Streamlet[T] = { |
| val newJavaStreamlet = javaStreamlet.applyOperator[T](operator, grouper) |
| fromJavaStreamlet(newJavaStreamlet) |
| } |
| |
| /* |
| * Returns multiple streams by splitting incoming stream. |
| * @param splitFns The Split Functions that test if the tuple should be emitted into each stream |
| * Note that there could be 0 or multiple target stream ids |
| */ |
| override def split(splitFns: Map[String, R => Boolean]): Streamlet[R] = { |
| val javaSerializablePredicates: JMap[String, SerializablePredicate[R]] = new JHashMap() |
| splitFns.foreach { case (key, func) => |
| javaSerializablePredicates.put(key, toSerializablePredicate[R](func)) |
| } |
| val newJavaStreamlet = javaStreamlet.split(javaSerializablePredicates) |
| fromJavaStreamlet[R](newJavaStreamlet) |
| } |
| |
| /** |
| * Logs every element of the streamlet using String.valueOf function |
| * This is one of the sink functions in the sense that this operation returns void |
| */ |
| override def log(): Unit = javaStreamlet.log() |
| |
| /** |
| * Applies the consumer function to every element of the stream |
| * This function does not return anything. |
| * |
| * @param consumer The user supplied consumer function that is invoked for each element |
| * of this streamlet. |
| */ |
| override def consume(consumer: R => Unit): Unit = { |
| val serializableConsumer = toSerializableConsumer[R](consumer) |
| javaStreamlet.consume(serializableConsumer) |
| } |
| |
| /** |
| * Applies the sink's put function to every element of the stream |
| * This function does not return anything. |
| * |
| * @param sink The Sink whose put method consumes each element |
| * of this streamlet. |
| */ |
| override def toSink(sink: Sink[R]): Unit = { |
| val javaSink = toJavaSink[R](sink) |
| javaStreamlet.toSink(javaSink) |
| } |
| |
| /** |
| * Gets all the children of this streamlet. |
| * Children of a streamlet are streamlets that are resulting from transformations of elements of |
| * this and potentially other streamlets. |
| * |
| * @return The kid streamlets |
| */ |
| private[impl] def getChildren: List[JavaStreamletImpl[_]] = { |
| import _root_.scala.collection.JavaConversions._ |
| val children = |
| javaStreamlet |
| .asInstanceOf[JavaStreamletImpl[_]] |
| .getChildren |
| children.toList |
| } |
| } |