Historically Storm provided Spout and Bolt apis for expressing streaming computations. Though these apis are fairly simple to use, there are no reusable constructs for expressing common streaming operations like filtering, transformations, windowing, joins, aggregations and so on.
Stream APIs build on top of the Storm's spouts and bolts to provide a typed API for expressing streaming computations and supports functional style operations such as map-reduce.
Conceptually a Stream
can be thought of as a stream of messages flowing through a pipeline. A Stream
may be generated by reading messages out of a source like spout, or by transforming other streams. For example,
// imports import org.apache.storm.streams.Stream; import org.apache.storm.streams.StreamBuilder; ... StreamBuilder builder = new StreamBuilder(); // a stream of sentences obtained from a source spout Stream<String> sentences = builder.newStream(new RandomSentenceSpout()).map(tuple -> tuple.getString(0)); // a stream of words obtained by transforming (splitting) the stream of sentences Stream<String> words = sentences.flatMap(s -> Arrays.asList(s.split(" "))); // output operation that prints the words to console words.forEach(w -> System.out.println(w));
Most stream operations accept parameters that describe user-specified behavior typically via lambda expressions like s -> Arrays.asList(s.split(" "))
as in the above example.
A Stream
supports two kinds of operations,
flatMap
operation in the example above)forEach
operation in the example above).StreamBuilder
provides the builder apis to create a new stream. Typically a spout forms the source of a stream.
StreamBuilder builder = new StreamBuilder(); Stream<Tuple> sentences = builder.newStream(new TestSentenceSpout());
The StreamBuilder
tracks the overall pipeline of operations expressed via the Stream. One can then create the Storm topology via build()
and submit it like a normal storm topology via StormSubmitter
.
StormSubmitter.submitTopologyWithProgressBar("test", new Config(), streamBuilder.build());
Value mappers can be used to extract specific fields from the tuples emitted from a spout to produce a typed stream of values. Value mappers are passed as arguments to the StreamBuilder.newStream
.
StreamBuilder builder = new StreamBuilder(); // extract the first field from the tuple to get a Stream<String> of sentences Stream<String> sentences = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0));
Storm provides strongly typed tuples via the Pair
and Tuple classes (Tuple3 upto Tuple10). One can use a TupleValueMapper
to produce a stream of typed tuples as shown below.
// extract first three fields of the tuple emitted by the spout to produce a stream of typed tuples. Stream<Tuple3<String, Integer, Long>> stream = builder.newStream(new TestSpout(), TupleValueMappers.of(0, 1, 2));
Storm's streaming apis (defined in Stream and PairStream) currently support a wide range of operations such as transformations, filters, windowing, aggregations, branching, joins, stateful, output and debugging operations.
filter
returns a stream consisting of the elements of the stream that matches the given Predicate
(for which the predicate returns true).
Stream<String> logs = ... Stream<String> errors = logs.filter(line -> line.contains("ERROR"));
In the above example log lines with ‘ERROR’ are filtered into an error stream which can be then be further processed.
map
returns a stream consisting of the result of applying the given mapping function to the values of the stream.
Stream<String> words = ... Stream<Integer> wordLengths = words.map(String::length);
The example generates a stream of word lengths from a stream of words by applying the String.length function on each value. Note that the type of the resultant stream of a map operation can be different from that of the original stream.
flatMap
returns a stream consisting of the results of replacing each value of the stream with the contents produced by applying the provided mapping function to each value. This is similar to map but each value can be mapped to 0 or more values.
Stream<String> sentences = ... Stream<String> words = sentences.flatMap(s -> Arrays.asList(s.split(" ")));
In the above example, the lambda function splits each value in the stream to a list of words and the flatMap function generates a flattened stream of words out of it.
A window
operation produces a windowed stream consisting of the elements that fall within the window as specified by the window parameter. All the windowing options supported in the underlying windowed bolts are supported via the Stream apis.
Stream<T> windowedStream = stream.window(Window<?, ?> windowConfig);
The windowConfig parameter specifies the windowing config like sliding or tumbling windows based on time duration or event count.
// time based sliding window stream.window(SlidingWindows.of(Duration.minutes(10), Duration.minutes(1))); // count based sliding window stream.window(SlidingWindows.of(Count.(10), Count.of(2))); // tumbling window stream.window(TumblingWindows.of(Duration.seconds(10)); // specifying timestamp field for event time based processing and a late tuple stream. stream.window(TumblingWindows.of(Duration.seconds(10) .withTimestampField("ts") .withLateTupleStream("late_events"));
A windowing operation splits the continuous stream of values into subsets and is necessary for performing operations like Joins and Aggregations.
These operations transform a Stream of values into a stream of key-value pairs.
Stream<Integer> integers = … // 1, 2, 3, 4, ... PairStream<Integer, Integer> squares = integers.mapToPair(x -> Pair.of(x, x*x)); // (1, 1), (2, 4), (3, 9), (4, 16), ...
A key-value pair stream is required for operations like groupByKey, aggregateByKey, joins etc.
Aggregate operations aggregate the values (or key-values) in a stream. Typically the aggregation operations are performed on a windowed stream where the aggregate results are emitted on each window activation.
aggregate
and reduce
computes global aggregation i.e. the values across all partitions are forwarded to a single task for computing the aggregate.
Stream<Long> numbers = … // aggregate the numbers and produce a stream of last 10 sec sums. Stream<Long> sums = numbers.window(TumblingWindows.of(Duration.seconds(10)).aggregate(new Sum()); // the last 10 sec sums computed using reduce Stream<Long> sums = numbers.window(...).reduce((x, y) -> x + y);
aggreagate
and reduce
differs in the way in which the aggreate results are computed.
A reduce
operation repeatedly applies the given reducer and reduces two values to a single value until there is only one value left. This may not be feasible or easy for all kinds of aggreagations (e.g. avg).
An aggregate
operation does a mutable reduction. A mutable reduction accumulates results into an accumulator as it processes the values.
The aggregation operations (aggregate and reduce) automatically does a local aggregation whenever possible before doing the network shuffle to minimize the amount of messages transmitted over the network. For example to compute sum, a per-partition partial sum is computed and only the partial sums are transferred over the network to the target bolt where the partial sums are merged to produce the final sum. A CombinerAggregator
interface is used as the argument of aggregate
to enable this.
For example the Sum
(passed as the argument of aggregate in the example above) can be implemented as a CombinerAggregator
as follows.
public class Sum implements CombinerAggregator<Long, Long, Long> { // The initial value of the sum @Override public Long init() { return 0L; } // Updates the sum by adding the value (this could be a partial sum) @Override public Long apply(Long aggregate, Long value) { return aggregate + value; } // merges the partial sums @Override public Long merge(Long accum1, Long accum2) { return accum1 + accum2; } // extract result from the accumulator (here the accumulator and result is the same) @Override public Long result(Long accum) { return accum; } }
These are similar to the aggregate and reduce operations but does the aggregation per key.
aggregateByKey
aggregates the values for each key of the stream using the given Aggregator.
Stream<String> words = ... // a windowed stream of words Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1) // convert to a stream of (word, 1) pairs .aggregateByKey(new Count<>()); // compute counts per word
reduceByKey
performs a reduction on the values for each key of this stream by repeatedly applying the reducer.
Stream<String> words = ... // a windowed stream of words Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1) // convert to a stream of (word, 1) pairs .reduceByKey((x, y) -> x + y); // compute counts per word
Like the global aggregate/reduce, per-partition local aggregate (per key) is computed and the partial results are send to the target bolts where the partial results are merged to produce the final aggregate.
groupByKey
on a stream of key-value pairs returns a new stream where the values are grouped by the keys.
// a stream of (user, score) pairs e.g. ("alice", 10), ("bob", 15), ("bob", 20), ("alice", 11), ("alice", 13) PairStream<String, Double> scores = ... // list of scores per user in the last window, e.g. ("alice", [10, 11, 13]), ("bob", [15, 20]) PairStream<String, Iterable<Integer>> userScores = scores.window(...).groupByKey();
countByKey
counts the values for each key of this stream.
Stream<String> words = ... // a windowed stream of words Stream<String, Long> wordCounts = words.mapToPair(w -> Pair.of(w,1) // convert to a stream of (word, 1) pairs .countByKey(); // compute counts per word
Internally countByKey
uses aggregateByKey
to compute the count.
A repartition
operation re-partitions the current stream and returns a new stream with the specified number of partitions. Further operations on resultant stream would execute at that level of parallelism. Re-partiton can be used to increase or reduce the parallelism of the operations in the stream.
The initial number of partitions can be also specified while creating the stream (via the StreamBuilder.newStream)
// Stream 's1' will have 2 partitions and operations on s1 will execute at this level of parallelism Stream<String> s1 = builder.newStream(new TestWordSpout(), new ValueMapper<String>(0), 2); // Stream 's2' and further operations will have three partitions Stream<String, Integer> s2 = s1.map(function1).repartition(3); // perform a map operation on s2 and print the result s2.map(function2).print();
Note: a repartition
operation implies network transfer. In the above example the first map operation (function1) would be executed at a parallelism of 2 (on two partitions of s1), whereas the second map operation (function2) would be executed at a parallelism of 3 (on three partitions of s2). This also means that the first and second map operations has to be executed on two separate bolts and involves network transfer.
Output operations push out the transformed values in the stream to the console, external sinks like databases, files or even Storm bolts.
print
prints the values in the stream to console. For example,
// transforms words to uppercase and prints to the console words.map(String::toUpperCase).print();
peek
returns a stream consisting of the elements of the stream, additionally performing the provided action on each element as they are consumed from the resulting stream. This can be used to ‘inspect’ the values flowing at any stage in a stream.
builder.newStream(...).flatMap(s -> Arrays.asList(s.split(" "))) // print the results of the flatMap operation as the values flow across the stream. .peek(s -> System.out.println(s)) .mapToPair(w -> new Pair<>(w, 1))
This is the most generic output operation and can be used to execute an arbitrary code for each value in the stream, like storing the results into an external database, file and so on.
stream.forEach(value -> { // log it LOG.debug(value) // store the value into a db and so on... statement.executeUpdate(..); } );
This allows one to plug in existing bolts as sinks.
// The redisBolt is a standard storm bolt IRichBolt redisBolt = new RedisStoreBolt(poolConfig, storeMapper); ... // generate the word counts and store it in redis using redis bolt builder.newStream(new TestWordSpout(), new ValueMapper<String>(0)) .mapToPair(w -> Pair.of(w, 1)) .countByKey() // the (word, count) pairs are forwarded to the redisBolt which stores it in redis .to(redisBolt);
Note that this will provide guarantees only based on what the bolt provides.
A branch
operation can be used to express If-then-else logic on streams.
Stream<T>[] streams = stream.branch(Predicate<T>... predicates)
The predicates are applied in the given order to the values of the stream and the result is forwarded to the corresponding (index based) result stream based on the first predicate that matches. If none of the predicates match a value, that value is dropped.
For example,
Stream<Integer>[] streams = builder.newStream(new RandomIntegerSpout(), new ValueMapper<Integer>(0)) .branch(x -> (x % 2) == 0, x -> (x % 2) == 1); Stream<Integer> evenNumbers = streams[0]; Stream<Integer> oddNumbers = streams[1];
A join
operation joins the values of one stream with the values having the same key from another stream.
PairStream<Long, Long> squares = … // (1, 1), (2, 4), (3, 9) ... PairStream<Long, Long> cubes = … // (1, 1), (2, 8), (3, 27) ... // join the sqaures and cubes stream to produce (1, [1, 1]), (2, [4, 8]), (3, [9, 27]) ... PairStream<Long, Pair<Long, Long>> joined = squares.window(TumblingWindows.of(Duration.seconds(5))).join(cubes);
Joins are typically invoked on a windowed stream, joining the key-values that arrived on each stream in the current window. The parallelism of the stream on which the join is invoked is carried forward to the joined stream. An optional ValueJoiner
can be passed as an argument to join to specify how to join the two values for each matching key (the default behavior is to return a Pair
of the value from both streams).
Left, right and full outer joins are supported.
coGroupByKey
Groups the values of this stream with the values having the same key from the other stream.
// a stream of (key, value) pairs e.g. (k1, v1), (k2, v2), (k2, v3) PairStream<String, String> stream1 = ... // another stream of (key, value) pairs e.g. (k1, x1), (k1, x2), (k3, x3) PairStream<String, String> stream2 = ... // the co-grouped values per key in the last window, e.g. (k1, ([v1], [x1, x2]), (k2, ([v2, v3], [])), (k3, ([], [x3])) PairStream<String, Iterable<String>> coGroupedStream = stream1.window(...).coGroupByKey(stream2);
Storm provides APIs for applications to save and update the state of its computation and also to query the state.
updateStateByKey
updates the state by applying a given state update function to the previous state and the new value for the key. updateStateByKey
can be invoked with either an initial value for the state and a state update function or by directly providing a StateUpdater
implementation.
PairStream<String, Long> wordCounts = ... // Update the word counts in the state; here the first argument 0L is the initial value for the state and // the second argument is a function that adds the count to the current value in the state. StreamState<String, Long> streamState = wordCounts.updateStateByKey(0L, (state, count) -> state + count) streamState.toPairStream().print();
The state value can be of any type. In the above example its of type Long
and stores the word count.
Internally storm uses stateful bolts for storing the state. The Storm config topology.state.provider
can be used to choose the state provider implementation. For example set this to org.apache.storm.redis.state.RedisKeyValueStateProvider
for redis based state store.
stateQuery
can be used to query the state (updated by updateStateByKey
). The StreamState
returned by the updateStateByKey operation has to be used for querying stream state. The values in the stream are used as the keys to query the state.
// The stream of words emitted by the QuerySpout is used as the keys to query the state. builder.newStream(new QuerySpout(), new ValueMapper<String>(0)) // Queries the state and emits the matching (key, value) as results. // The stream state returned by updateStateByKey is passed as the argument to stateQuery. .stateQuery(streamState).print();
Right now the topologies built using Stream API provides at-least once guarantee.
Note that only the updateStateByKey
operation currently executes on an underlying StatefulBolt. The other stateful operations (join, windowing, aggregation etc) executes on an IRichBolt and stores its state in memory. It relies on storms acking and replay mechanisms to rebuild the state.
In future the underlying framework of the Stream API would be enhanced to provide exactly once guarantees.
Here's a word count topology expressed using the Stream API,
StreamBuilder builder = new StreamBuilder(); builder // A stream of random sentences with two partitions .newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2) // a two seconds tumbling window .window(TumblingWindows.of(Duration.seconds(2))) // split the sentences to words .flatMap(s -> Arrays.asList(s.split(" "))) // create a stream of (word, 1) pairs .mapToPair(w -> Pair.of(w, 1)) // compute the word counts in the last two second window .countByKey() // print the results to stdout .print();
The RandomSentenceSpout
is a regular Storm spout that continuously emits random sentences. The stream of sentences are split into two second windows and the word count within each window is computed and printed.
The stream can then be submitted just like a regular topology as shown below.
Config config = new Config(); config.setNumWorkers(1); StormSubmitter.submitTopologyWithProgressBar("topology-name", config, builder.build());
More examples are available under storm-starter which will help you get started.