In order to use the Heron Streamlet API for Scala, you'll need to install the heron-api
library.
In order to use the heron-api
library, add this to the dependencies
block of your pom.xml
configuration file:
<dependency> <groupId>org.apache.heron</groupId> <artifactId>heron-api</artifactId> <version>{{< heronVersion >}}</version> </dependency>
In order to run a Scala topology created using the Heron Streamlet API in a Heron cluster, you'll need to package your topology as a “fat” JAR with dependencies included. You can use the Maven Assembly Plugin to generate JARs with dependencies. To install the plugin and add a Maven goal for a single JAR, add this to the plugins
block in your pom.xml
:
<plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
Once your pom.xml
is properly set up, you can compile the JAR with dependencies using this command:
$ mvn assembly:assembly
By default, this will add a JAR in your project‘s target
folder with the name PROJECT-NAME-VERSION-jar-with-dependencies.jar
. Here’s an example topology submission command using a compiled JAR:
$ mvn assembly:assembly $ heron submit local \ target/my-project-1.2.3-jar-with-dependencies.jar \ com.example.Main \ MyTopology arg1 arg2
Every Streamlet API topology needs to be configured using a Config
object. Here's an example default configuration:
import org.apache.heron.streamlet.Config import org.apache.heron.streamlet.scala.Runner val topologyConfig = Config.defaultConfig() // Apply topology configuration using the topologyConfig object val topologyRunner = new Runner() topologyRunner.run("name-for-topology", topologyConfig, topologyBuilder)
The table below shows the configurable parameters for Heron topologies:
Parameter | Default |
---|---|
Delivery semantics | At most once |
Serializer | Kryo |
Number of total container topologies | 2 |
Per-container CPU | 1.0 |
Per-container RAM | 100 MB |
Here's an example non-default configuration:
val topologyConfig = Config.newBuilder() .setNumContainers(5) .setPerContainerRamInGigabytes(10) .setPerContainerCpu(3.5f) .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE) .setSerializer(Config.Serializer.JAVA) .setUserConfig("some-key", "some-value") .build()
You can apply delivery semantics to a Streamlet API topology like this:
topologyConfig .setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
The other available options in the DeliverySemantics
enum are ATMOST_ONCE
and ATLEAST_ONCE
.
In the Heron Streamlet API for Scala, processing graphs consist of streamlets. One or more supplier streamlets inject data into your graph to be processed by downstream operators.
Operation | Description | Example |
---|---|---|
map | Create a new streamlet by applying the supplied mapping function to each element in the original streamlet | Add 1 to each element in a streamlet of integers |
flatMap | Like a map operation but with the important difference that each element of the streamlet is flattened | Flatten a sentence into individual words |
filter | Create a new streamlet containing only the elements that satisfy the supplied filtering function | Remove all inappropriate words from a streamlet of strings |
union | Unifies two streamlets into one, without modifying the elements of the two streamlets | Unite two different Streamlet<String> s into a single streamlet |
clone | Creates any number of identical copies of a streamlet | Create three separate streamlets from the same source |
transform | Transform a streamlet using whichever logic you‘d like (useful for transformations that don’t neatly map onto the available operations) | |
join | Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer | Combine key-value pairs listing current scores (e.g. ("h4x0r", 127) ) for each user into a single per-user stream |
keyBy | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet | |
reduceByKey | Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered |
reduceByKeyAndWindow | Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window |
countByKey | A special reduce operation of counting number of tuples on each key | Count the number of times a value has been encountered |
countByKeyAndWindow | A special reduce operation of counting number of tuples on each key, within a time window | Count the number of times a value has been encountered within a specified time window |
split | Split a streamlet into multiple streamlets with different id | |
withStream | Select a stream with id from a streamlet that contains multiple streams | |
applyOperator | Returns a new streamlet by applying an user defined operator to the original streamlet | Apply an existing bolt as an operator |
repartition | Create a new streamlet by applying a new parallelism level to the original streamlet | Increase the parallelism of a streamlet from 5 to 10 |
toSink | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc. | Store processing graph results in an AWS Redshift table |
log | Logs the final results of a processing graph to stdout. This must be the last step in the graph. | |
consume | Consume operations are like sink operations except they don't require implementing a full sink interface (consume operations are thus suited for simple operations like logging) | Log processing graph results using a custom formatting function |
Map operations create a new streamlet by applying the supplied mapping function to each element in the original streamlet. Here's an example:
builder.newSource(() => 1) .map[Int]((i: Int) => i + 12) // or .map[Int](_.+(12)) as synthetic function
In this example, a supplier streamlet emits an indefinite series of 1s. The map
operation then adds 12 to each incoming element, producing a streamlet of 13s.
FlatMap operations are like map
operations but with the important difference that each element of the streamlet is “flattened” into a collection type. In this example, a supplier streamlet emits the same sentence over and over again; the flatMap
operation transforms each sentence into a Scala List
of individual words:
builder.newSource(() => "I have nothing to declare but my genius") .flatMap[String](_.split(" "))
The effect of this operation is to transform the Streamlet[String]
into a Streamlet[List[String]]
.
One of the core differences between
map
andflatMap
operations is thatflatMap
operations typically transform non-collection types into collection types.
Filter operations retain elements in a streamlet, while potentially excluding some or all elements, on the basis of a provided filtering function. Here's an example:
import java.util.concurrent.ThreadLocalRandom builder.newSource(() => ThreadLocalRandom.current().nextInt(1, 11)) .filter(_.<(7))
In this example, a source streamlet consisting of random integers between 1 and 10 is modified by a filter operation that removes all streamlet elements that are lower than 7.
Union operations combine two streamlets of the same type into a single streamlet without modifying the elements. Here's an example:
val flowers = builder.newSource(() => "flower") val butterflies = builder.newSource(() => "butterfly") val combinedSpringStreamlet = flowers.union(butterflies)
Here, one streamlet is an endless series of “flowers” while the other is an endless series of “butterflies”. The union
operation combines them into a single streamlet of alternating “flowers” and “butterflies”.
Clone operations enable you to create any number of “copies” of a streamlet. Each of the “copy” streamlets contains all the elements of the original and can be manipulated just like the original streamlet. Here's an example:
import scala.util.Random val integers = builder.newSource(() => Random.nextInt(100)) val copies = integers.clone(5) val ints1 = copies.get(0) val ints2 = copies.get(1) val ints3 = copies.get(2) // and so on...
In this example, a streamlet of random integers between 0 and 99 is split into 5 identical streamlets.
Transform operations are highly flexible operations that are most useful for:
Transform operations require you to implement three different methods:
setup
function that enables you to pass a context object to the operation and to specify what happens prior to the transform
steptransform
operation that performs the desired transformationcleanup
function that allows you to specify what happens after the transform
stepThe context object available to a transform operation provides access to:
Here's a Scala example of a transform operation in a topology where a stateful record is kept of the number of items processed:
import org.apache.heron.streamlet.Context import org.apache.heron.streamlet.scala.SerializableTransformer class CountNumberOfItems extends SerializableTransformer[String, String] { private val numberOfItems = new AtomicLong() override def setup(context: Context): Unit = { numberOfItems.incrementAndGet() context.getState().put("number-of-items", numberOfItems) } override def transform(i: String, f: String => Unit): Unit = { val transformedString = i.toUpperCase f(transformedString) } override def cleanup(): Unit = println(s"Successfully processed new state: $numberOfItems") }
This operation does a few things:
setup
method, the Context
object is used to access the current state (which has the semantics of a Java Map
). The current number of items processed is incremented by one and then saved as the new state.transform
method, the incoming string is transformed as UpperCase in some way and then “accepted” as the new value.cleanup
step, the current count of items processed is logged.Here's that operation within the context of a streamlet processing graph:
builder.newSource(() => "Some string over and over"); .transform(new CountNumberOfItems()) .log()
For a more in-depth conceptual discussion of joins, see the Heron Streamlet API doc.
Join operations unify two streamlets on a key (join operations thus require KV streamlets). Each KeyValue
object in a streamlet has, by definition, a key. When a join
operation is added to a processing graph,
import org.apache.heron.streamlet.{Config, KeyValue, WindowConfig} import org.apache.heron.streamlet.scala.Builder val builder = Builder.newBuilder() val streamlet1 = builder .newSource(() => new KeyValue[String, String]("heron-api", "topology-api")) .setName("streamlet1") val streamlet2 = builder .newSource(() => new KeyValue[String, String]("heron-api", "streamlet-api")) .setName("streamlet2") streamlet1.join[KeyValue[String, String], KeyValue[String, String], String]( streamlet2, (kv: KeyValue[String, String]) => kv, (kv: KeyValue[String, String]) => kv, WindowConfig.TumblingCountWindow(10), (kv1: KeyValue[String, String], kv2: KeyValue[String, String]) => kv1.getValue + " - " + kv2.getValue )
In this case, the resulting streamlet would consist of an indefinite stream with two KeyValue
objects with the key heron-api
but different values (topology-api
and streamlet-api
).
The effect of a
join
operation is to create a new streamlet for each key.
Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:
val builder = Builder.newBuilder() builder .newSource(() => "Paco de Lucia is one of the most popular virtuoso") // Convert each sentence into individual words .flatMap[String](_.split(" ")) .keyBy[String, Int]( // Key extractor (in this case, each word acts as the key) (word: String) => word, // Value extractor (get the length of each word) (word: String) => word.length ) // The result is logged .log();
You can apply reduce operations to streamlets by specifying:
Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:
val builder = Builder.newBuilder() builder .newSource(() => "Paco de Lucia is one of the most popular virtuoso") // Convert each sentence into individual words .flatMap[String](_.split(" ")) .reduceByKey[String, Int]( // Key extractor (in this case, each word acts as the key) (word: String) => word, // Value extractor (each word appears only once, hence the value is always 1) (word: String) => 1, // Reduce operation (a running sum) (x: Int, y: Int) => x + y) // The result is logged .log();
You can apply reduce operations to streamlets by specifying:
Reduce by key and window operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value, as well as information about the window in which the operation took place). Here's an example:
import org.apache.heron.streamlet.WindowConfig; val builder = Builder.newBuilder() builder .newSource(() => "Paco de Lucia is one of the most popular virtuoso") // Convert each sentence into individual words .flatMap[String](_.split(" ")) .reduceByKeyAndWindow[String, Int]( // Key extractor (in this case, each word acts as the key) (word: String) => word, // Value extractor (each word appears only once, hence the value is always 1) (word: String) => 1, // Window configuration WindowConfig.TumblingCountWindow(50), // Reduce operation (a running sum) (x: Int, y: Int) => x + y) // The result is logged .log();
Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:
val builder = Builder.newBuilder() builder .newSource(() => "Paco de Lucia is one of the most popular virtuoso") // Convert each sentence into individual words .flatMap[String](_.split(" ")) // Count the number of occurrences of each word .countByKey[String]((word: String) => word) // The result is logged .log();
Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each time window. Here's an example:
val builder = Builder.newBuilder() builder .newSource(() => "Paco de Lucia is one of the most popular virtuoso") // Convert each sentence into individual words .flatMap[String](_.split(" ")) // Count the number of occurrences of each word within each time window .countByKeyAndWindow[String]( (word: String) => word, WindowConfig.TumblingCountWindow(50)) // The result is logged .log();
Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:
val builder = Builder.newBuilder() builder .newSource(() => "Paco de Lucia is one of the most popular virtuoso") // Convert each sentence into individual words .flatMap[String](_.split(" ")) // Count the number of occurrences of each word within each time window .split(Map( "long_word" -> { word: String => word.length >= 4 }, "short_word" -> { word: String => word.length < 4 } )) .withStream("short_word) // The result is logged .log();
With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with split.
Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:
val builder = Builder.newBuilder() private class MyBoltOperator extends MyBolt with IStreamletOperator[String, String] { } builder .newSource(() => "Paco de Lucia is one of the most popular virtuoso") // Convert each sentence into individual words .flatMap[String](_.split(" ")) // Apply user defined operation .applyOperator(new MyBoltOperator()) // The result is logged .log();
When you assign a number of partitions to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a map
operation, then any mapToKV
, flatMap
, filter
, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using repartition
. Here's an example:
import java.util.concurrent.ThreadLocalRandom; val builder = Builder.newBuilder val numbers = builder .newSource(() => ThreadLocalRandom.current().nextInt(1, 11)) numbers .setNumPartitions(5) .map(i => i + 1) .repartition(2) .filter(i => i > 7 && i < 2) .log()
In this example, the supplier streamlet emits random integers between 1 and 10. That operation is assigned 5 partitions. After the map
operation, the repartition
function is used to assign 2 partitions to all downstream operations.
In processing graphs like the ones you build using the Heron Streamlet API, sinks are essentially the terminal points in your graph, where your processing logic comes to an end. A processing graph can end with writing to a database, publishing to a topic in a pub-sub messaging system, and so on. With the Streamlet API, you can implement your own custom sinks. Here's an example:
import org.apache.heron.streamlet.Context import org.apache.heron.streamlet.scala.Sink class FormattedLogSink extends Sink[String] { private var streamName: Option[String] = None override def setup(context: Context): Unit = streamName = Some(context.getStreamName) override def put(tuple: String): Unit = println(s"The current value of tuple is $tuple in stream: $streamName") override def cleanup(): Unit = {} }
In this example, the sink fetches the name of the enclosing streamlet from the context passed in the setup
method. The put
method specifies how the sink handles each element that is received (in this case, a formatted message is logged to stdout). The cleanup
method enables you to specify what happens after the element has been processed by the sink.
Here is the FormattedLogSink
at work in an example processing graph:
val builder = Builder.newBuilder builder.newSource(() => "Here is a string to be passed to the sink") .toSink(new FormattedLogSink)
Log operations rely on a log sink that is provided out of the box. You'll need to implement other sinks yourself.
Log operations are special cases of consume operations that log streamlet elements to stdout.
Streamlet elements will be using their
toString
representations and at theINFO
level.
Consume operations are like sink operations except they don‘t require implementing a full sink interface. Consume operations are thus suited for simple operations like formatted logging. Here’s an example:
val builder = Builder.newBuilder .newSource(() => Random.nextInt(10)) .filter(i => i % 2 == 0) .consume(i => println(s"Even number found: $i"))