blob: a08dfa3daeb5e8b1933191a0660461b063ba36f9 [file] [log] [blame]
---
title: The Heron Streamlet API for Java
description: Create Heron topologies in Java using a functional programming style
---
{{< alert "streamlet-api-beta" >}}
{{content/snippets/heron-streamlet-api.md}}
## Getting started
In order to use the Heron Streamlet API for Java, you'll need to install the `heron-api` library.
### Maven setup
In order to use the `heron-api` library, add this to the `dependencies` block of your `pom.xml` configuration file:
```xml
<dependency>
<groupId>org.apache.heron</groupId>
<artifactId>heron-api</artifactId>
<version>{{< heronVersion >}}</version>
</dependency>
```
#### Compiling a JAR with dependencies
In order to run a Java topology created using the Heron Streamlet API in a Heron cluster, you'll need to package your topology as a "fat" JAR with dependencies included. You can use the [Maven Assembly Plugin](https://maven.apache.org/plugins/maven-assembly-plugin/usage.html) to generate JARs with dependencies. To install the plugin and add a Maven goal for a single JAR, add this to the `plugins` block in your `pom.xml`:
```xml
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
```
Once your `pom.xml` is properly set up, you can compile the JAR with dependencies using this command:
```bash
$ mvn assembly:assembly
```
By default, this will add a JAR in your project's `target` folder with the name `PROJECT-NAME-VERSION-jar-with-dependencies.jar`. Here's an example topology submission command using a compiled JAR:
```bash
$ mvn assembly:assembly
$ heron submit local \
target/my-project-1.2.3-jar-with-dependencies.jar \
com.example.Main \
MyTopology arg1 arg2
```
### Java Streamlet API starter project
If you'd like to up and running quickly with the Heron Streamlet API for Java, you can clone [this repository](https://github.com/streamlio/heron-java-streamlet-api-example), which includes an example topology built using the Streamlet API as well as the necessary Maven configuration. To build a JAR with dependencies of this example topology:
```bash
$ git clone https://github.com/streamlio/heron-java-streamlet-api-example
$ cd heron-java-streamlet-api-example
$ mvn assembly:assembly
$ ls target/*.jar
target/heron-java-streamlet-api-example-latest-jar-with-dependencies.jar
target/heron-java-streamlet-api-example-latest.jar
```
If you're running a [local Heron cluster](../../../getting-started), you can submit the built example topology like this:
```bash
$ heron submit local target/heron-java-streamlet-api-example-latest-jar-with-dependencies.jar \
io.streaml.heron.streamlet.WordCountStreamletTopology \
WordCountStreamletTopology
```
#### Selecting delivery semantics
Heron enables you to apply one of three [delivery semantics](../../../concepts/delivery-semantics) to any Heron topology. For the [example topology](#java-streamlet-api-starter-project) above, you can select the delivery semantics when you submit the topology with the topology's second argument. This command, for example, would apply [effectively-once](../../../concepts/delivery-semantics) to the example topology:
```bash
$ heron submit local target/heron-java-streamlet-api-example-latest-jar-with-dependencies.jar \
io.streaml.heron.streamlet.WordCountStreamletTopology \
WordCountStreamletTopology \
effectively-once
```
The other options are `at-most-once` and `at-least-once`. If you don't explicitly select the delivery semantics, at-least-once semantics will be applied.
## Streamlet API topology configuration
Every Streamlet API topology needs to be configured using a `Config` object. Here's an example default configuration:
```java
import org.apache.heron.streamlet.Config;
import org.apache.heron.streamlet.Runner;
Config topologyConfig = Config.defaultConfig();
// Apply topology configuration using the topologyConfig object
Runner topologyRunner = new Runner();
topologyRunner.run("name-for-topology", topologyConfig, topologyBuilder);
```
The table below shows the configurable parameters for Heron topologies:
Parameter | Default
:---------|:-------
[Delivery semantics](#delivery-semantics) | At most once
Serializer | [Kryo](https://github.com/EsotericSoftware/kryo)
Number of total container topologies | 2
Per-container CPU | 1.0
Per-container RAM | 100 MB
Here's an example non-default configuration:
```java
Config topologyConfig = Config.newBuilder()
.setNumContainers(5)
.setPerContainerRamInGigabytes(10)
.setPerContainerCpu(3.5f)
.setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE)
.setSerializer(Config.Serializer.JAVA)
.setUserConfig("some-key", "some-value")
.build();
```
### Delivery semantics
You can apply [delivery semantics](../../../concepts/delivery-semantics) to a Streamlet API topology like this:
```java
topologyConfig
.setDeliverySemantics(Config.DeliverySemantics.EFFECTIVELY_ONCE);
```
The other available options in the `DeliverySemantics` enum are `ATMOST_ONCE` and `ATLEAST_ONCE`.
## Streamlets
In the Heron Streamlet API for Java, processing graphs consist of [streamlets](../../../concepts/topologies#streamlets). One or more supplier streamlets inject data into your graph to be processed by downstream operators.
## Operations
Operation | Description | Example
:---------|:------------|:-------
[`map`](#map-operations) | Create a new streamlet by applying the supplied mapping function to each element in the original streamlet | Add 1 to each element in a streamlet of integers
[`flatMap`](#flatmap-operations) | Like a map operation but with the important difference that each element of the streamlet is flattened | Flatten a sentence into individual words
[`filter`](#filter-operations) | Create a new streamlet containing only the elements that satisfy the supplied filtering function | Remove all inappropriate words from a streamlet of strings
[`union`](#union-operations) | Unifies two streamlets into one, without modifying the elements of the two streamlets | Unite two different `Streamlet<String>`s into a single streamlet
[`clone`](#clone-operations) | Creates any number of identical copies of a streamlet | Create three separate streamlets from the same source
[`transform`](#transform-operations) | Transform a streamlet using whichever logic you'd like (useful for transformations that don't neatly map onto the available operations) |
[`join`](#join-operations) | Create a new streamlet by combining two separate key-value streamlets into one on the basis of each element's key. Supported Join Types: Inner (as default), Outer-Left, Outer-Right and Outer. | Combine key-value pairs listing current scores (e.g. `("h4x0r", 127)`) for each user into a single per-user stream
[`keyBy`](#key-by-operations) | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet |
[`reduceByKey`](#reduce-by-key-operations) | Produces a streamlet of key-value on each key, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered
[`reduceByKeyAndWindow`](#reduce-by-key-and-window-operations) | Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values | Count the number of times a value has been encountered within a specified time window
[`countByKey`](#count-by-key-operations) | A special reduce operation of counting number of tuples on each key | Count the number of times a value has been encountered
[`countByKeyAndWindow`](#count-by-key-and-window-operations) | A special reduce operation of counting number of tuples on each key, within a time window | Count the number of times a value has been encountered within a specified time window
[`split`](#split-operations) | Split a streamlet into multiple streamlets with different id |
[`withStream`](#with-stream-operations) | Select a stream with id from a streamlet that contains multiple streams |
[`applyOperator`](#apply-operator-operations) | Returns a new streamlet by applying an user defined operator to the original streamlet | Apply an existing bolt as an operator
[`repartition`](#repartition-operations) | Create a new streamlet by applying a new parallelism level to the original streamlet | Increase the parallelism of a streamlet from 5 to 10
[`toSink`](#sink-operations) | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc. | Store processing graph results in an AWS Redshift table
[`log`](#log-operations) | Logs the final results of a processing graph to stdout. This *must* be the last step in the graph. |
[`consume`](#consume-operations) | Consume operations are like sink operations except they don't require implementing a full sink interface (consume operations are thus suited for simple operations like logging) | Log processing graph results using a custom formatting function
### Map operations
Map operations create a new streamlet by applying the supplied mapping function to each element in the original streamlet. Here's an example:
```java
builder.newSource(() -> 1)
.map(i -> i + 12);
```
In this example, a supplier streamlet emits an indefinite series of 1s. The `map` operation then adds 12 to each incoming element, producing a streamlet of 13s.
### FlatMap operations
FlatMap operations are like `map` operations but with the important difference that each element of the streamlet is "flattened" into a collection type. In this example, a supplier streamlet emits the same sentence over and over again; the `flatMap` operation transforms each sentence into a Java `List` of individual words:
```java
builder.newSource(() -> "I have nothing to declare but my genius")
.flatMap((sentence) -> Arrays.asList(sentence.split("\\s+")));
```
The effect of this operation is to transform the `Streamlet<String>` into a `Streamlet<List<String>>`.
> One of the core differences between `map` and `flatMap` operations is that `flatMap` operations typically transform non-collection types into collection types.
### Filter operations
Filter operations retain elements in a streamlet, while potentially excluding some or all elements, on the basis of a provided filtering function. Here's an example:
```java
builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11))
.filter((i) -> i < 7);
```
In this example, a source streamlet consisting of random integers between 1 and 10 is modified by a `filter` operation that removes all streamlet elements that are greater than 6.
### Union operations
Union operations combine two streamlets of the same type into a single streamlet without modifying the elements. Here's an example:
```java
Streamlet<String> flowers = builder.newSource(() -> "flower");
Streamlet<String> butterflies = builder.newSource(() -> "butterfly");
Streamlet<String> combinedSpringStreamlet = flowers
.union(butterflies);
```
Here, one streamlet is an endless series of "flowers" while the other is an endless series of "butterflies". The `union` operation combines them into a single `Spring` streamlet of alternating "flowers" and "butterflies".
### Clone operations
Clone operations enable you to create any number of "copies" of a streamlet. Each of the "copy" streamlets contains all the elements of the original and can be manipulated just like the original streamlet. Here's an example:
```java
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
Streamlet<Integer> integers = builder.newSource(() -> ThreadLocalRandom.current().nextInt(100));
List<Streamlet<Integer>> copies = integers.clone(5);
Streamlet<Integer> ints1 = copies.get(0);
Streamlet<Integer> ints2 = copies.get(1);
Streamlet<Integer> ints3 = copies.get(2);
// and so on...
```
In this example, a streamlet of random integers between 1 and 100 is split into 5 identical streamlets.
### Transform operations
Transform operations are highly flexible operations that are most useful for:
* operations involving state in [stateful topologies](../../concepts/delivery-semantics#stateful-topologies)
* operations that don't neatly fit into the other categories or into a lambda-based logic
Transform operations require you to implement three different methods:
* A `setup` method that enables you to pass a context object to the operation and to specify what happens prior to the `transform` step
* A `transform` operation that performs the desired transformation
* A `cleanup` method that allows you to specify what happens after the `transform` step
The context object available to a transform operation provides access to:
* the current state of the topology
* the topology's configuration
* the name of the stream
* the stream partition
* the current task ID
Here's a Java example of a transform operation in a topology where a stateful record is kept of the number of items processed:
```java
import org.apache.heron.streamlet.Context;
import org.apache.heron.streamlet.SerializableTransformer;
import java.util.function.Consumer;
public class CountNumberOfItems implements SerializableTransformer<String, String> {
private int numberOfItems;
public void setup(Context context) {
numberOfItems = (int) context.getState().get("number-of-items");
context.getState().put("number-of-items", numberOfItems + 1);
}
public void transform(String in, Consumer<String> consumer) {
String transformedString = // Apply some operation to the incoming value
consumer.accept(transformedString);
}
public void cleanup() {
System.out.println(
String.format("Successfully processed new state: %d", numberOfItems));
}
}
```
This operation does a few things:
* In the `setup` method, the [`Context`](/api/java/org/apache/heron/streamlet/Context.html) object is used to access the current state (which has the semantics of a Java `Map`). The current number of items processed is incremented by one and then saved as the new state.
* In the `transform` method, the incoming string is transformed in some way and then "accepted" as the new value.
* In the `cleanup` step, the current count of items processed is logged.
Here's that operation within the context of a streamlet processing graph:
```java
builder.newSource(() -> "Some string over and over");
.transform(new CountNumberOfItems())
.log();
```
### Join operations
> For a more in-depth conceptual discussion of joins, see the [Heron Streamlet API](../../../concepts/streamlet-api#join-operations) doc.
Join operations unify two streamlets *on a key* (join operations thus require KV streamlets). Each `KeyValue` object in a streamlet has, by definition, a key. When a join operation is added to a processing graph,
```java
import org.apache.heron.streamlet.WindowConfig;
Builder builder = Builder.newBuilder();
KVStreamlet<String, String> streamlet1 =
builder.newKVSource(() -> new KeyValue<>("heron-api", "topology-api"));
builder.newSource(() -> new KeyValue<>("heron-api", "streamlet-api"))
.join(streamlet1, WindowConfig.TumblingCountWindow(10), KeyValue::create);
```
In this case, the resulting streamlet would consist of an indefinite stream with two `KeyValue` objects with the key `heron-api` but different values (`topology-api` and `streamlet-api`).
> The effect of a join operation is to create a new streamlet *for each key*.
### Key by operations
Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet. Here is an example:
```java
import java.util.Arrays;
Builder builder = Builder.newBuilder()
.newSource(() -> "Mary had a little lamb")
// Convert each sentence into individual words
.flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
.keyBy(
// Key extractor (in this case, each word acts as the key)
word -> word,
// Value extractor (get the length of each word)
word -> workd.length()
)
// The result is logged
.log();
```
### Reduce by key operations
You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
* a key extractor that determines what counts as the key for the streamlet
* a value extractor that determines which final value is chosen for each element of the streamlet
* a reduce function that produces a single value for each key in the streamlet
Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value). Here's an example:
```java
import java.util.Arrays;
Builder builder = Builder.newBuilder()
.newSource(() -> "Mary had a little lamb")
// Convert each sentence into individual words
.flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
.reduceByKeyAndWindow(
// Key extractor (in this case, each word acts as the key)
word -> word,
// Value extractor (each word appears only once, hence the value is always 1)
word -> 1,
// Reduce operation (a running sum)
(x, y) -> x + y
)
// The result is logged
.log();
```
### Reduce by key and window operations
You can apply [reduce](https://docs.oracle.com/javase/tutorial/collections/streams/reduction.html) operations to streamlets by specifying:
* a key extractor that determines what counts as the key for the streamlet
* a value extractor that determines which final value is chosen for each element of the streamlet
* a [time window](../../../concepts/topologies#window-operations) across which the operation will take place
* a reduce function that produces a single value for each key in the streamlet
Reduce by key and window operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value, as well as information about the window in which the operation took place). Here's an example:
```java
import java.util.Arrays;
import org.apache.heron.streamlet.WindowConfig;
Builder builder = Builder.newBuilder()
.newSource(() -> "Mary had a little lamb")
// Convert each sentence into individual words
.flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
.reduceByKeyAndWindow(
// Key extractor (in this case, each word acts as the key)
word -> word,
// Value extractor (each word appears only once, hence the value is always 1)
word -> 1,
// Window configuration
WindowConfig.TumblingCountWindow(50),
// Reduce operation (a running sum)
(x, y) -> x + y
)
// The result is logged
.log();
```
### Count by key operations
Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered. Here's an example:
```java
import java.util.Arrays;
Builder builder = Builder.newBuilder()
.newSource(() -> "Mary had a little lamb")
// Convert each sentence into individual words
.flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
.countByKeyAndWindow(word -> word)
// The result is logged
.log();
```
### Count by key and window operations
Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each [time window](../../../concepts/topologies#window-operations). Here's an example:
```java
import java.util.Arrays;
import org.apache.heron.streamlet.WindowConfig;
Builder builder = Builder.newBuilder()
.newSource(() -> "Mary had a little lamb")
// Convert each sentence into individual words
.flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
.countByKeyAndWindow(
// Key extractor (in this case, each word acts as the key)
word -> word,
// Window configuration
WindowConfig.TumblingCountWindow(50),
)
// The result is logged
.log();
```
### Split operations
Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet. Here is an example:
```java
import java.util.Arrays;
Map<String, SerializablePredicate<String>> splitter = new HashMap();
splitter.put("long_word", s -> s.length() >= 4);
splitter.put("short_word", s -> s.length() < 4);
Builder builder = Builder.newBuilder()
.newSource(() -> "Mary had a little lamb")
// Convert each sentence into individual words
.flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
// Splits the stream into streams of long and short words
.split(splitter)
// Choose the stream of the short words
.withStream("short_word")
// The result is logged
.log();
```
### With stream operations
With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with [split](#split-operations).
### Apply operator operations
Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet. Here is an example:
```java
import java.util.Arrays;
private class MyBoltOperator extends MyBolt implements IStreamletRichOperator<Double, Double> {
}
Builder builder = Builder.newBuilder()
.newSource(() -> "Mary had a little lamb")
// Convert each sentence into individual words
.flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+")))
// Apply user defined operation
.applyOperator(new MyBoltOperator())
// The result is logged
.log();
```
### Repartition operations
When you assign a number of [partitions](#partitioning-and-parallelism) to a processing step, each step that comes after it inherits that number of partitions. Thus, if you assign 5 partitions to a `map` operation, then any `mapToKV`, `flatMap`, `filter`, etc. operations that come after it will also be assigned 5 partitions. But you can also change the number of partitions for a processing step (as well as the number of partitions for downstream operations) using `repartition`. Here's an example:
```java
import java.util.concurrent.ThreadLocalRandom;
Builder builder = Builder.newBuilder();
builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11))
.setNumPartitions(5)
.map(i -> i + 1)
.repartition(2)
.filter(i -> i > 7 && i < 2)
.log();
```
In this example, the supplier streamlet emits random integers between one and ten. That operation is assigned 5 partitions. After the `map` operation, the `repartition` function is used to assign 2 partitions to all downstream operations.
### Sink operations
In processing graphs like the ones you build using the Heron Streamlet API, **sinks** are essentially the terminal points in your graph, where your processing logic comes to an end. A processing graph can end with writing to a database, publishing to a topic in a pub-sub messaging system, and so on. With the Streamlet API, you can implement your own custom sinks. Here's an example:
```java
import org.apache.heron.streamlet.Context;
import org.apache.heron.streamlet.Sink;
public class FormattedLogSink implements Sink<T> {
private String streamletName;
public void setup(Context context) {
streamletName = context.getStreamName();
}
public void put(T element) {
String message = String.format("Streamlet %s has produced an element with a value of: '%s'",
streamletName,
element.toString());
System.out.println(message);
}
public void cleanup() {}
}
```
In this example, the sink fetches the name of the enclosing streamlet from the context passed in the `setup` method. The `put` method specifies how the sink handles each element that is received (in this case, a formatted message is logged to stdout). The `cleanup` method enables you to specify what happens after the element has been processed by the sink.
Here is the `FormattedLogSink` at work in an example processing graph:
```java
Builder builder = Builder.newBuilder();
builder.newSource(() -> "Here is a string to be passed to the sink")
.toSink(new FormattedLogSink());
```
> [Log operations](#log-operations) rely on a log sink that is provided out of the box. You'll need to implement other sinks yourself.
### Log operations
Log operations are special cases of consume operations that log streamlet elements to stdout.
> Streamlet elements will be using their `toString` representations and at the `INFO` level.
### Consume operations
Consume operations are like [sink operations](#sink-operations) except they don't require implementing a full sink interface. Consume operations are thus suited for simple operations like formatted logging. Here's an example:
```java
import java.util.concurrent.ThreadLocalRandom;
Builder builder = Builder.newBuilder()
.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11))
.filter(i -> i % 2 == 0)
.consume(i -> {
String message = String.format("Even number found: %d", i);
System.out.println(message);
});
```