When it was first released, Heron offered a Topology API---heavily indebted to the Storm API---for developing topology logic. In the original Topology API, developers creating topologies were required to explicitly:
Although the Storm-inspired API provided a powerful low-level interface for creating topologies, the spouts-and-bolts model also presented a variety of drawbacks for Heron developers:
Drawback | Description |
---|---|
Verbosity | In the original Topology API for both Java and Python, creating spouts and bolts required substantial boilerplate and forced developers to both provide implementations for spout and bolt classes and also to specify the connections between those spouts and bolts. |
Difficult debugging | When spouts, bolts, and the connections between them need to be created “by hand,” it can be challenging to trace the origin of problems in the topology's processing chain |
Tuple-based data model | In the older topology API, spouts and bolts passed tuples and nothing but tuples within topologies. Although tuples are a powerful and flexible data type, the topology API forced all spouts and bolts to implement their own serialization/deserialization logic. |
In contrast with the Topology API, the Heron Streamlet API offers:
Advantage | Description |
---|---|
Boilerplate-free code | Instead of needing to implement spout and bolt classes over and over again, the Heron Streamlet API enables you to create stream processing logic out of functions, such as map, flatMap, join, and filter functions, instead. |
Easy debugging | With the Heron Streamlet API, you don't have to worry about spouts and bolts, which means that you can more easily surface problems with your processing logic. |
Completely flexible, type-safe data model | Instead of requiring that all processing components pass tuples to one another (which implicitly requires serialization to and deserializaton from your application-specific types), the Heron Streamlet API enables you to write your processing logic in accordance with whatever types you'd like---including tuples, if you wish. In the Streamlet API for Java, all streamlets are typed (e.g. Streamlet<MyApplicationType> ), which means that type errors can be caught at compile time rather than at runtime. |
Instead of spouts and bolts, as with the Topology API, the Streamlet API enables you to create processing graphs that are then automatically converted to spouts and bolts under the hood. Processing graphs consist of the following components:
The diagram below illustrates both the general model (with a single source, three operators, and one sink), and a more concrete example that includes two sources (an Apache Pulsar topic and the Twitter API), three operators (a join, flatMap, and reduce operation), and two sinks (an Apache Cassandra table and an Apache Spark job).
The core construct underlying the Heron Streamlet API is that of the streamlet. A streamlet is an unbounded, ordered collection of elements of some data type (streamlets can consist of simple types like integers and strings or more complex, application-specific data types).
Source streamlets supply a Heron processing graph with data inputs. These inputs can come from a wide variety of sources, such as pub-sub messaging systems like Apache Kafka and Apache Pulsar (incubating), random generators, or static files like CSV or Apache Parquet files.
Source streamlets can then be manipulated in a wide variety of ways. You can, for example:
The diagram below shows an example streamlet:
In this diagram, the source streamlet is produced by a random generator that continuously emits random integers between 1 and 100. From there:
The Heron Streamlet API is currently available for:
With the Heron Streamlet API you still create topologies, but only implicitly. Heron automatically performs the heavy lifting of converting the streamlet-based processing logic that you create into spouts and bolts and, from there, into containers that are then deployed using whichever scheduler your Heron cluster relies upon.
From the standpoint of both operators and developers managing topologies' lifecycles, the resulting topologies are equivalent. From a development workflow standpoint, however, the difference is profound. You can think of the Streamlet API as a highly convenient tool for creating spouts, bolts, and the logic that connects them.
The basic workflow looks like this:
When creating topologies using the Heron Streamlet API, you simply write code (example below) in a highly functional style. From there:
With a physical plan in place, the Streamlet API topology can be submitted to a Heron cluster.
The code below shows how you could implement the processing graph shown above in Java:
import java.util.concurrent.ThreadLocalRandom; import org.apache.heron.streamlet.Builder; import org.apache.heron.streamlet.Config; import org.apache.heron.streamlet.Runner; Builder builder = Builder.newBuilder(); // Function for generating random integers int randomInt(int lower, int upper) { return ThreadLocalRandom.current().nextInt(lower, upper + 1); } // Source streamlet builder.newSource(() -> randomInt(1, 100)) // Filter operation .filter(i -> i > 30) // Map operation .map(i -> i + 15) // Log sink .log(); Config config = new Config(); // This topology will be spread across two containers config.setNumContainers(2); // Submit the processing graph to Heron as a topology new Runner("IntegerProcessingGraph", config, builder).run();
As you can see, the Java code for the example streamlet processing graph requires very little boilerplate and is heavily indebted to Java lambda patterns.
In the Heron Streamlet API, processing data means transforming streamlets into other streamlets. This can be done using a wide variety of available operations, including many that you may be familiar with from functional programming:
Operation | Description |
---|---|
map | Returns a new streamlet by applying the supplied mapping function to each element in the original streamlet |
flatMap | Like a map operation but with the important difference that each element of the streamlet is flattened into a collection type |
filter | Returns a new streamlet containing only the elements that satisfy the supplied filtering function |
union | Unifies two streamlets into one, without windowing or modifying the elements of the two streamlets |
clone | Creates any number of identical copies of a streamlet |
transform | Transform a streamlet using whichever logic you‘d like (useful for transformations that don’t neatly map onto the available operations) |
keyBy | Returns a new key-value streamlet by applying the supplied extractors to each element in the original streamlet |
reduceByKey | Produces a streamlet of key-value on each key and in accordance with a reduce function that you apply to all the accumulated values |
reduceByKeyAndWindow | Produces a streamlet of key-value on each key, within a time window, and in accordance with a reduce function that you apply to all the accumulated values |
countByKey | A special reduce operation of counting number of tuples on each key |
countByKeyAndWindow | A special reduce operation of counting number of tuples on each key, within a time window |
split | Split a streamlet into multiple streamlets with different id. |
withStream | Select a stream with id from a streamlet that contains multiple streams |
applyOperator | Returns a new streamlet by applying an user defined operator to the original streamlet |
join | Joins two separate key-value streamlets into a single streamlet on a key, within a time window, and in accordance with a join function |
log | Logs the final streamlet output of the processing graph to stdout |
toSink | Sink operations terminate the processing graph by storing elements in a database, logging elements to stdout, etc. |
consume | Consume operations are like sink operations except they don't require implementing a full sink interface (consume operations are thus suited for simple operations like logging) |
Map operations create a new streamlet by applying the supplied mapping function to each element in the original streamlet.
import org.apache.heron.streamlet.Builder; Builder processingGraphBuilder = Builder.newBuilder(); Streamlet<Integer> ones = processingGraphBuilder.newSource(() -> 1); Streamlet<Integer> thirteens = ones.map(i -> i + 12);
In this example, a supplier streamlet emits an indefinite series of 1s. The map
operation then adds 12 to each incoming element, producing a streamlet of 13s. The effect of this operation is to transform the Streamlet<Integer>
into a Streamlet<Integer>
with different values (map operations can also convert streamlets into streamlets of a different type).
FlatMap operations are like map operations but with the important difference that each element of the streamlet is “flattened” into a collection type. In the Java example below, a supplier streamlet emits the same sentence over and over again; the flatMap
operation transforms each sentence into a Java List
of individual words.
Streamlet<String> sentences = builder.newSource(() -> "I have nothing to declare but my genius"); Streamlet<List<String>> words = sentences .flatMap((sentence) -> Arrays.asList(sentence.split("\\s+")));
The effect of this operation is to transform the Streamlet<String>
into a Streamlet<List<String>>
containing each word emitted by the source streamlet.
Filter operations retain some elements in a streamlet and exclude other elements on the basis of a provided filtering function.
Streamlet<Integer> randomInts = builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11)); Streamlet<Integer> lessThanSeven = randomInts .filter(i -> i <= 7);
In this example, a source streamlet consisting of random integers between 1 and 10 is modified by a filter operation that removes all streamlet elements that are greater than 7.
Union operations combine two streamlets of the same type into a single streamlet without modifying the elements.
Streamlet<String> oohs = builder.newSource(() -> "ooh"); Streamlet<String> aahs = builder.newSource(() -> "aah"); Streamlet<String> combined = oohs .union(aahs);
Here, one streamlet is an endless series of “ooh”s while the other is an endless series of “aah”s. The union
operation combines them into a single streamlet of alternating “ooh”s and “aah”s.
Clone operations enable you to create any number of “copies” of a streamlet. Each of the “copy” streamlets contains all the elements of the original and can be manipulated just like the original streamlet.
import java.util.List; import java.util.concurrent.ThreadLocalRandom; Streamlet<Integer> integers = builder.newSource(() -> ThreadLocalRandom.current().nextInt(100)); List<Streamlet<Integer>> copies = integers.clone(5); Streamlet<Integer> ints1 = copies.get(0); Streamlet<Integer> ints2 = copies.get(1); Streamlet<Integer> ints3 = copies.get(2); // and so on...
In this example, a streamlet of random integers between 1 and 100 is split into 5 identical streamlets.
Transform operations are highly flexible operations that are most useful for:
Transform operations require you to implement three different methods:
setup
method that enables you to pass a context object to the operation and to specify what happens prior to the transform
steptransform
operation that performs the desired transformationcleanup
method that allows you to specify what happens after the transform
stepThe context object available to a transform operation provides access to:
Here's a Java example of a transform operation in a topology where a stateful record is kept of the number of items processed:
import org.apache.heron.streamlet.Context; import org.apache.heron.streamlet.SerializableTransformer; import java.util.function.Consumer; public class CountNumberOfItems implements SerializableTransformer<String, String> { private int numberOfItems; public void setup(Context context) { numberOfItems = (int) context.getState("number-of-items"); context.getState().put("number-of-items", numberOfItems + 1); } public void transform(String in, Consumer<String> consumer) { String transformedString = // Apply some operation to the incoming value consumer.accept(transformedString); } public void cleanup() { System.out.println( String.format("Successfully processed new state: %d", numberOfItems)); } }
This operation does a few things:
setup
method, the Context
object is used to access the current state (which has the semantics of a Java Map
). The current number of items processed is incremented by one and then saved as the new state.transform
method, the incoming string is transformed in some way and then “accepted” as the new value.cleanup
step, the current count of items processed is logged.Here's that operation within the context of a streamlet processing graph:
builder.newSource(() -> "Some string over and over"); .transform(new CountNumberOfItems()) .log();
Key by operations convert each item in the original streamlet into a key-value pair and return a new streamlet.
import java.util.Arrays; Builder builder = Builder.newBuilder() .newSource(() -> "Mary had a little lamb") // Convert each sentence into individual words .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+"))) .keyBy( // Key extractor (in this case, each word acts as the key) word -> word, // Value extractor (get the length of each word) word -> workd.length() ) // The result is logged .log();
You can apply reduce operations to streamlets by specifying:
Reduce by key operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value).
import java.util.Arrays; Builder builder = Builder.newBuilder() .newSource(() -> "Mary had a little lamb") // Convert each sentence into individual words .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+"))) .reduceByKeyAndWindow( // Key extractor (in this case, each word acts as the key) word -> word, // Value extractor (each word appears only once, hence the value is always 1) word -> 1, // Reduce operation (a running sum) (x, y) -> x + y ) // The result is logged .log();
You can apply reduce operations to streamlets by specifying:
Reduce by key and window operations produce a new streamlet of key-value window objects (which include a key-value pair including the extracted key and calculated value, as well as information about the window in which the operation took place).
import java.util.Arrays; import org.apache.heron.streamlet.WindowConfig; Builder builder = Builder.newBuilder(); builder.newSource(() -> "Mary had a little lamb") .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+"))) .reduceByKeyAndWindow( // Key extractor (in this case, each word acts as the key) word -> word, // Value extractor (each word appears only once, hence the value is always 1) word -> 1, // Window configuration WindowConfig.TumblingCountWindow(50), // Reduce operation (a running sum) (x, y) -> x + y ) // The result is logged .log();
Count by key operations extract keys from data in the original streamlet and count the number of times a key has been encountered.
import java.util.Arrays; Builder builder = Builder.newBuilder() .newSource(() -> "Mary had a little lamb") // Convert each sentence into individual words .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+"))) .countByKeyAndWindow(word -> word) // The result is logged .log();
Count by key and window operations extract keys from data in the original streamlet and count the number of times a key has been encountered within each time window.
import java.util.Arrays; import org.apache.heron.streamlet.WindowConfig; Builder builder = Builder.newBuilder() .newSource(() -> "Mary had a little lamb") // Convert each sentence into individual words .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+"))) .countByKeyAndWindow( // Key extractor (in this case, each word acts as the key) word -> word, // Window configuration WindowConfig.TumblingCountWindow(50), ) // The result is logged .log();
Split operations split a streamlet into multiple streamlets with different id by getting the corresponding stream ids from each item in the origina streamlet.
import java.util.Arrays; Map<String, SerializablePredicate<String>> splitter = new HashMap(); splitter.put("long_word", s -> s.length() >= 4); splitter.put("short_word", s -> s.length() < 4); Builder builder = Builder.newBuilder() .newSource(() -> "Mary had a little lamb") // Convert each sentence into individual words .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+"))) // Splits the stream into streams of long and short words .split(splitter) // Choose the stream of the short words .withStream("short_word") // The result is logged .log();
With stream operations select a stream with id from a streamlet that contains multiple streams. They are often used with split.
Apply operator operations apply a user defined operator (like a bolt) to each element of the original streamlet and return a new streamlet.
import java.util.Arrays; private class MyBoltOperator extends MyBolt implements IStreamletRichOperator<Double, Double> { } Builder builder = Builder.newBuilder() .newSource(() -> "Mary had a little lamb") // Convert each sentence into individual words .flatMap(sentence -> Arrays.asList(sentence.toLowerCase().split("\\s+"))) // Apply user defined operation .applyOperator(new MyBoltOperator()) // The result is logged .log();
Join operations in the Streamlet API take two streamlets (a “left” and a “right” streamlet) and join them together:
You may already be familiar with JOIN
operations in SQL databases, like this:
SELECT username, email FROM all_users INNER JOIN banned_users ON all_users.username NOT IN banned_users.username;
If you'd like to unite two streamlets into one without applying a window or a join function, you can use a union operation, which are available for key-value streamlets as well as normal streamlets.
All join operations are performed:
The Heron Streamlet API supports four types of joins:
Type | What the join operation yields | Default? |
---|---|---|
Inner | All key-values with matched keys across the left and right stream | Yes |
Outer left | All key-values with matched keys across both streams plus unmatched keys in the left stream | |
Outer right | All key-values with matched keys across both streams plus unmatched keys in the left stream | |
Outer | All key-values across both the left and right stream, regardless of whether or not any given element has a matching key in the other stream |
Inner joins operate over the Cartesian product of the left stream and the right stream, i.e. over all the whole set of all ordered pairs between the two streams. Imagine this set of key-value pairs accumulated within a time window:
Left streamlet | Right streamlet |
---|---|
(“player1”, 4) | (“player1”, 10) |
(“player1”, 5) | (“player1”, 12) |
(“player1”, 17) | (“player2”, 27) |
An inner join operation would thus apply the join function to all key-values with matching keys, thus 3 × 2 = 6 in total, producing this set of key-values:
Included key-values |
---|
(“player1”, 4) |
(“player1”, 5) |
(“player1”, 10) |
(“player1”, 12) |
(“player1”, 17) |
Note that the
("player2", 27)
key-value pair was not included in the stream because there's no matching key-value in the left streamlet.
If the supplied join function, say, added the values together, then the resulting joined stream would look like this:
Operation | Joined Streamlet |
---|---|
4 + 10 | (“player1”, 14) |
4 + 12 | (“player1”, 16) |
5 + 10 | (“player1”, 15) |
5 + 12 | (“player1”, 17) |
17 + 10 | (“player1”, 27) |
17 + 12 | (“player1”, 29) |
Inner joins are the “default” join type in the Heron Streamlet API. If you call the
join
method without specifying a join type, an inner join will be applied.
class Score { String playerUsername; int playerScore; // Setters and getters } Streamlet<Score> scores1 = /* A stream of player scores */; Streamlet<Score> scores2 = /* A second stream of player scores */; scores1 .join( scores2, // Key extractor for the left stream (scores1) score -> score.getPlayerUsername(), // Key extractor for the right stream (scores2) score -> score.getPlayerScore(), // Window configuration WindowConfig.TumblingCountWindow(50), // Join function (selects the larger score as the value using // using a ternary operator) (x, y) -> (x.getPlayerScore() >= y.getPlayerScore()) ? x.getPlayerScore() : y.getPlayerScore() ) .log();
In this example, two streamlets consisting of Score
objects are joined. In the join
function, a key and value extractor are supplied along with a window configuration and a join function. The resulting, joined streamlet will consist of key-value pairs in which each player's username will be the key and the joined---in this case highest---score will be the value.
By default, an inner join is applied in join operations but you can also specify a different join type. Here's a Java example for an outer right join:
import org.apache.heron.streamlet.JoinType; scores1 .join( scores2, // Key extractor for the left stream (scores1) score -> score.getPlayerUsername(), // Key extractor for the right stream (scores2) score -> score.getPlayerScore(), // Window configuration WindowConfig.TumblingCountWindow(50), // Join type JoinType.OUTER_RIGHT, // Join function (selects the larger score as the value using // using a ternary operator) (x, y) -> (x.getPlayerScore() >= y.getPlayerScore()) ? x.getPlayerScore() : y.getPlayerScore() ) .log();
An outer left join includes the results of an inner join plus all of the unmatched keys in the left stream. Take this example left and right streamlet:
Left streamlet | Right streamlet |
---|---|
(“player1”, 4) | (“player1”, 10) |
(“player2”, 5) | (“player4”, 12) |
(“player3”, 17) |
The resulting set of key-values within the time window:
Included key-values |
---|
(“player1”, 4) |
(“player1”, 10) |
(“player2”, 5) |
(“player3”, 17) |
In this case, key-values with a key of player4
are excluded because they are in the right stream but have no matching key with any element in the left stream.
An outer right join includes the results of an inner join plus all of the unmatched keys in the right stream. Take this example left and right streamlet (from above):
Left streamlet | Right streamlet |
---|---|
(“player1”, 4) | (“player1”, 10) |
(“player2”, 5) | (“player4”, 12) |
(“player3”, 17) |
The resulting set of key-values within the time window:
Included key-values |
---|
(“player1”, 4) |
(“player1”, 10) |
(“player2”, 5) |
(“player4”, 17) |
In this case, key-values with a key of player3
are excluded because they are in the left stream but have no matching key with any element in the right stream.
Outer joins include all key-values across both the left and right stream, regardless of whether or not any given element has a matching key in the other stream. If you want to ensure that no element is left out of a resulting joined streamlet, use an outer join. Take this example left and right streamlet (from above):
Left streamlet | Right streamlet |
---|---|
(“player1”, 4) | (“player1”, 10) |
(“player2”, 5) | (“player4”, 12) |
(“player3”, 17) |
The resulting set of key-values within the time window:
Included key-values |
---|
(“player1”, 4) |
(“player1”, 10) |
(“player2”, 5) |
(“player4”, 12) |
(“player3”, 17) |
Note that all key-values were indiscriminately included in the joined set.
In processing graphs like the ones you build using the Heron Streamlet API, sinks are essentially the terminal points in your graph, where your processing logic comes to an end. A processing graph can end with writing to a database, publishing to a topic in a pub-sub messaging system, and so on. With the Streamlet API, you can implement your own custom sinks.
import org.apache.heron.streamlet.Context; import org.apache.heron.streamlet.Sink; public class FormattedLogSink implements Sink<T> { private String streamletName; public void setup(Context context) { streamletName = context.getStreamletName(); } public void put(T element) { String message = String.format("Streamlet %s has produced an element with a value of: '%s'", streamletName, element.toString()); System.out.println(message); } public void cleanup() {} }
In this example, the sink fetches the name of the enclosing streamlet from the context passed in the setup
method. The put
method specifies how the sink handles each element that is received (in this case, a formatted message is logged to stdout). The cleanup
method enables you to specify what happens after the element has been processed by the sink.
Here is the FormattedLogSink
at work in an example processing graph:
Builder builder = Builder.newBuilder(); builder.newSource(() -> "Here is a string to be passed to the sink") .toSink(new FormattedLogSink());
Log operations rely on a log sink that is provided out of the box. You'll need to implement other sinks yourself.
Consume operations are like sink operations except they don't require implementing a full sink interface. Consume operations are thus suited for simple operations like formatted logging.
Builder builder = Builder.newBuilder() .newSource(() -> generateRandomInteger()) .filter(i -> i % 2 == 0) .consume(i -> { String message = String.format("Even number found: %d", i); System.out.println(message); });
In the topology API, processing parallelism can be managed via adjusting the number of spouts and bolts performing different operations, enabling you to, for example, increase the relative parallelism of a bolt by using three of that bolt instead of two.
The Heron Streamlet API provides a different mechanism for controlling parallelism: partitioning. To understand partitioning, keep in mind that rather than physical spouts and bolts, the core processing construct in the Heron Streamlet API is the processing step. With the Heron Streamlet API, you can explicitly assign a number of partitions to each processing step in your graph (the default is one partition).
The example topology above, for example, has five steps:
You could apply varying numbers of partitions to each step in that topology like this:
Builder builder = Builder.newBuilder(); Streamlet<Integer> zeroes = builder.newSource(() -> 0) .setName("zeroes"); builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11)) .setName("random-ints") .setNumPartitions(3) .map(i -> i + 1) .setName("add-one") .repartition(3) .union(zeroes) .setName("unify-streams") .repartition(2) .filter(i -> i != 2) .setName("remove-all-twos") .repartition(1) .log();
As explained above, when you set a number of partitions for a specific operation (included for source streamlets), the same number of partitions is applied to all downstream operations until a different number is explicitly set.
import java.util.Arrays; Builder builder = Builder.newBuilder(); builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11)) .repartition(4, (element, numPartitions) -> { if (element > 5) { return Arrays.asList(0, 1); } else { return Arrays.asList(2, 3); } });