New Streamlet API for Heron
As of version 0.16.0, Heron offers a new Streamlet API that you can use to write topologies in a more declarative, functional manner, without needing to specify spout and bolt logic directly. The Streamlet API is currently in beta and available for Java. The Streamlet API for Python will be available soon.
More information on the Streamlet API can be found below.
A Heron topology is a directed acyclic graph (DAG) used to process streams of data. Topologies can be stateless or stateful depending on your use case.
Heron topologies consist of two basic components:
Spouts and bolts are connected to one another via streams of data. Below is a visual illustration of a simple Heron topology:
In the diagram above, spout S1 feeds data to bolts B1 and B2 for processing; in turn, bolt B1 feeds processed data to bolts B3 and B4, while bolt B2 feeds processed data to bolt B4. This is just a simple example; you can create arbitrarily complex topologies in Heron.
There are currently two APIs available that you can use to build Heron topologies:
Once you‘ve set up a Heron cluster, you can use Heron’s CLI tool to manage the entire lifecycle of a topology, which typically goes through the following stages:
A topology‘s logical plan is analagous to a database query plan in that it maps out the basic operations associated with a topology. Here’s an example logical plan for the example Streamlet API topology below:
Whether you use the Heron Streamlet API or the topology API, Heron automatically transforms the processing logic that you create into both a logical plan and a physical plan.
A topology‘s physical plan is related to its logical plan but with the crucial difference that a physical plan determines the “physical” execution logic of a topology, i.e. how topology processes are divided between containers. Here’s a basic visual representation of a physical plan:
In this example, a Heron topology consists of one spout and five different bolts (each of which has multiple instances) that have automatically been distributed between five different containers.
{{< alert “streamlet-api-beta” >}}
When Heron was first created, the model for creating topologies was deeply indebted to the Apache Storm model. Under that model, developers creating topologies needed to explicitly define the behavior of every spout and bolt in the topology. Although this provided a powerful low-level API for creating topologies, that approach presented a variety of drawbacks for developers:
In contrast with the topology API, the Heron Streamlet API offers:
Streamlet<MyApplicationType>
), which means that type errors can be caught at compile time rather than runtime.With the Heron Streamlet API you still create topologies, but only implicitly. Heron automatically performs the heavy lifting of converting the streamlet-based processing logic that you create into spouts and bolts and, from there, into containers that are then deployed using whichever scheduler your Heron cluster is using.
From the standpoint of both operators and developers managing topologies' lifecycles, the resulting topologies are equivalent. From a development workflow standpoint, however, the difference is profound.
The core construct underlying the Heron Streamlet API is that of the streamlet. A streamlet is a potentially unbounded, ordered collection of tuples. Streamlets can originate from a wide variety of sources, such as pub-sub messaging systems like Apache Kafka and Apache Pulsar (incubating), random generators, or static files like CVS or Parquet files.
In the Heron Streamlet API, processing data means transforming streamlets into other streamlets. This can be done using a wide variety of available operations, including many that you may be familiar with from functional programming:
Operation | Description |
---|---|
map | Returns a new streamlet by applying the supplied mapping function to each element in the original streamlet |
flatMap | Like a map operation but with the important difference that each element of the streamlet is flattened |
join | Joins two separate streamlets into a single streamlet |
filter | Returns a new streamlet containing only the elements that satisfy the supplied filtering function |
You can see an example streamlet-based processing graph in the diagram below:
Here's the corresponding Java code for the processing logic shown in the diagram:
package heron.streamlet.example; import com.twitter.heron.streamlet.*; import com.twitter.heron.streamlet.impl.StreamletImpl; import java.util.concurrent.ThreadLocalRandom; public final class ExampleStreamletAPITopology { public ExampleStreamletAPITopology() {} private int randomInt(int lower, int upper) { return ThreadLocalRandom.current().nextInt(lower, upper + 1); } public static void main(String[] args) { Builder builder = Builder.CreateBuilder(); builder.newSource(() -> 0) .setName("zeroes"); builder.newSource(() -> randomInt(1, 10)) .setName("random-ints") .map(i -> i + 1) .setName("add-one") .union(zeroes) .setName("unify-streams") .filter(i -> i != 2) .setName("remove-all-twos") .log(); Config conf = new Config(); conf.setNumContainers(2); new Runner().run("ExampleStreamletAPITopology", conf, builder); } }
That Java code will produce this logical plan:
In order to perform some operations, such as streamlet joins and streamlet reduce operations, you'll need to create key-value streamlets.
In the topology API, processing parallelism can be managed via adjusting the number of spouts and bolts performing different operations, enabling you to, for example, increase the relative parallelism of a bolt by using three of that bolt instead of two.
The Heron Streamlet API provides a different mechanism for controlling parallelism: partitioning. To understand partitioning, keep in mind that rather than physical spouts and bolts, the core processing construct in the Heron Streamlet API is the processing step. With the Heron Streamlet API, you can explicitly assign a number of partitions to each processing step in your graph (the default is one partition).
The example topology above, for example, has five steps:
You could apply varying numbers of partitions to each step in that topology like this:
Builder builder = Builder.CreateBuilder(); builder.newSource(() -> 0) .setName("zeroes"); builder.newSource(() -> ThreadLocalRandom.current().nextInt(1, 11)) .setName("random-ints") .setNumPartitions(3) .map(i -> i + 1) .setName("add-one") .setNumPartitions(3) .union(zeroes) .setName("unify-streams") .setNumPartitions(2) .filter(i -> i != 2) .setName("remove-all-twos") .setNumPartitions(2) .log();
The number of partitions to assign to each processing step when using the Streamlet API depends on a variety of factors.
Windowed computations gather results from a topology or topology component within a specified finite time frame rather than, say, on a per-tuple basis.
Here are some examples of window operations:
Sliding windows are windows that overlap, as in this figure:
For sliding windows, you need to specify two things:
In the figure above, the duration of the window is 10 seconds, while the sliding interval is 5 seconds. Each new window begins five seconds into the current window.
With sliding time windows, data can be processed in more than one window. Tuples 3, 4, and 5 above are processed in both window 1 and window 2 while tuples 6, 7, and 8 are processed in both window 2 and window 3.
Setting the duration of a window to 16 seconds and the sliding interval to 12 seconds would produce this window arrangement:
Here, the sliding interval determines that a new window is always created 12 seconds into the current window.
Tumbling windows are windows that don't overlap, as in this figure:
Tumbling windows don‘t overlap because a new window doesn’t begin until the current window has elapsed. For tumbling windows, you only need to specify the length or duration of the window but no sliding interval.
With tumbling windows, data are never processed in more than one window because the windows never overlap.
Count windows are specified on the basis of the number of operations rather than a time interval. A count window of 100 would mean that a window would elapse after 100 tuples have been processed, with no relation to clock time.
With count windows, this scenario (for a count window of 50) would be completely normal:
Window | Tuples processed | Clock time |
---|---|---|
1 | 50 | 10 seconds |
2 | 50 | 12 seconds |
3 | 50 | 1 hour, 12 minutes |
4 | 50 | 5 seconds |
Time windows differ from count windows because you need to specify a time duration (in seconds) rather than a number of tuples processed.
With time windows, this scenario (for a time window of 30 seconds) would be completely normal:
Window | Tuples processed | Clock time |
---|---|---|
1 | 150 | 30 seconds |
2 | 50 | 30 seconds |
3 | 0 | 30 seconds |
4 | 375 | 30 seconds |
As explained above, windows differ along two axes: sliding (overlapping) vs. tumbling (non overlapping) and count vs. time. This produces four total types:
When creating topologies using the Streamlet API, there are three types of resources that you can specify:
For each topology, there are defaults for each resource type:
Resource | Default | Minimum |
---|---|---|
Number of containers | 1 | 1 |
CPU | 1.0 | 1.0 |
RAM | 512 MB | 192MB |
For instructions on allocating resources to topologies, see the language-specific documentation for:
A Heron spout is a source of streams, responsible for emitting tuples into the topology. A spout may, for example, read data from a Kestrel queue or read tweets from the Twitter API and emit tuples to one or more bolts.
Information on building spouts can be found in Building Spouts.
A Heron bolt consumes streams of tuples emitted by spouts and performs some set of user-defined processing operations on those tuples, which may include performing complex stream transformations, performing storage operations, aggregating multiple streams into one, emitting tuples to other bolts within the topology, and much more.
Information on building bolts can be found in Building Bolts.
Heron's original topology API required using a fundamentally tuple-driven data model. You can find more information in Heron's Data Model.