Apache Beam’s primitives let you build expressive data pipelines, suitable for a variety of use cases. One specific use case is the analysis of time series data in which continuous sequences across window boundaries are important. A few fun challenges arise as you tackle this type of data and in this blog we will explore one of those in more detail and make use of the Timer API ([blog post]({{ site.baseurl }}/blog/2017/08/28/timely-processing.html)) using the “looping timer” pattern.
With Beam in streaming mode, you can take streams of data and build analytical transforms to produce results on the data. But for time series data, the absence of data is useful information. So how can we produce results in the absence of data?
Let's use a more concrete example to illustrate the requirement. Imagine you have a simple pipeline that sums the number of events coming from an IoT device every minute. We would like to produce the value 0 when no data has been seen within a specific time interval. So why can this get tricky? Well it is easy to build a simple pipeline that counts events as they arrive, but when there is no event, there is nothing to count!
Let's build a simple pipeline to work with:
// We will start our timer at 1 sec from the fixed upper boundary of our // minute window Instant now = Instant.parse("2000-01-01T00:00:59Z"); // ----- Create some dummy data // Create 3 elements, incrementing by 1 minute and leaving a time gap between // element 2 and element 3 TimestampedValue<KV<String, Integer>> time_1 = TimestampedValue.of(KV.of("Key_A", 1), now); TimestampedValue<KV<String, Integer>> time_2 = TimestampedValue.of(KV.of("Key_A", 2), now.plus(Duration.standardMinutes(1))); // No Value for start time + 2 mins TimestampedValue<KV<String, Integer>> time_3 = TimestampedValue.of(KV.of("Key_A", 3), now.plus(Duration.standardMinutes(3))); // Create pipeline PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(PipelineOptions.class); Pipeline p = Pipeline.create(options); // Apply a fixed window of duration 1 min and Sum the results p.apply(Create.timestamped(time_1, time_2, time_3)) .apply( Window.<KV<String,Integer>>into( FixedWindows.<Integer>of(Duration.standardMinutes(1)))) .apply(Sum.integersPerKey()) .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() { @ProcessElement public void process(ProcessContext c) { LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp()); } })); p.run();
Running that pipeline will result in the following output:
INFO LoopingTimer - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z INFO LoopingTimer - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z INFO LoopingTimer - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z
Note: The lack of order in the output should be expected, however the key-window tuple is correctly computed.
As expected, we see output in each of the interval windows which had a data point with a timestamp between the minimum and maximum value of the window. There was a data point at timestamps 00:00:59, 00:01:59 and 00:03:59, which fell into the following interval windows.
But as there was no data between 00:02:00 and 00:02:59, no value is produced for interval window [00:02:00,00:02:59.999).
How can we get Beam to output values for that missing window? First, let’s walk through some options that do not make use of the Timer API.
We can use an external system to emit a value for each time interval and inject it into the stream of data that Beam consumes. This simple option moves any complexity out of the Beam pipeline. But using an external system means we need to monitor this system and perform other maintenance tasks in tandem with the Beam pipeline.
We can use a generating source to emit the value using this code snippet:
pipeline.apply(GenerateSequence. from(0).withRate(1,Duration.standardSeconds(1L)))
We can then:
This is also a simple way of producing a value in each time interval.
Both options 1 and 2 work well for the case where there the pipeline processes a single key. Let’s now deal with the case where instead of 1 IoT device, there are 1000s or 100,000s of these devices, each with a unique key. To make option 1 or option 2 work in this scenario, we need to carry out an extra step: creating a FanOut DoFn. Each tick needs to be distributed to all the potential keys, so we need to create a FanOut DoFn that takes the dummy value and generates a key-value pair for every available key.
For example, let's assume we have 3 keys for 3 IoT devices, {key1,key2,key3}. Using the method we outlined in Option 2 when we get the first element from GenerateSequence, we need to create a loop in the DoFn that generates 3 key-value pairs. These pairs become the heartbeat value for each of the IoT devices.
And things get a lot more fun when we need to deal with lots of IoT devices, with a list of keys that are dynamically changing. We would need to add a transform that does a Distinct operation and feed the data produced as a side-input into the FanOut DoFn.
So how do timers help? Well let's have a look at a new transform:
Edit: Looping Timer State changed from Boolean to Long to allow for min value check.
public static class LoopingStatefulTimer extends DoFn<KV<String, Integer>, KV<String, Integer>> { Instant stopTimerTime; LoopingStatefulTimer(Instant stopTime){ this.stopTimerTime = stopTime; } @StateId("loopingTimerTime") private final StateSpec<ValueState<Long>> loopingTimerTime = StateSpecs.value(BigEndianLongCoder.of()); @StateId("key") private final StateSpec<ValueState<String>> key = StateSpecs.value(StringUtf8Coder.of()); @TimerId("loopingTimer") private final TimerSpec loopingTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME); @ProcessElement public void process(ProcessContext c, @StateId("key") ValueState<String> key, @StateId("loopingTimerTime") ValueState<Long> loopingTimerTime, @TimerId("loopingTimer") Timer loopingTimer) { // If the timer has been set already, or if the value is smaller than // the current element + window duration, do not set Long currentTimerValue = loopingTimerTime.read(); Instant nextTimerTimeBasedOnCurrentElement = c.timestamp().plus(Duration.standardMinutes(1)); if (currentTimerValue == null || currentTimerValue > nextTimerTimeBasedOnCurrentElement.getMillis()) { loopingTimer.set(nextTimerTimeBasedOnCurrentElement); loopingTimerTime.write(nextTimerTimeBasedOnCurrentElement.getMillis()); } // We need this value so that we can output a value for the correct key in OnTimer if (key.read() == null) { key.write(c.element().getKey()); } c.output(c.element()); } @OnTimer("loopingTimer") public void onTimer( OnTimerContext c, @StateId("key") ValueState<String> key, @TimerId("loopingTimer") Timer loopingTimer) { LOG.info("Timer @ {} fired", c.timestamp()); c.output(KV.of(key.read(), 0)); // If we do not put in a “time to live” value, then the timer would loop forever Instant nextTimer = c.timestamp().plus(Duration.standardMinutes(1)); if (nextTimer.isBefore(stopTimerTime)) { loopingTimer.set(nextTimer); } else { LOG.info( "Timer not being set as exceeded Stop Timer value {} ", stopTimerTime); } } }
There are two data values that the state API needs to keep:
timeRunning
value used to avoid resetting the timer if it’s already running.OnTimer
event later.We also have a Timer with the ID **loopingTimer**
that acts as our per interval alarm clock. Note that the timer is an event timer. It fires based on the watermark, not on the passage of time as the pipeline runs.
Next, let‘s unpack what’s happening in the @ProcessElement block:
The first element to come to this block will:
timerRunner
to True.@ProcessElement
block using c.output
.In the @OnTimer block, the following occurs:
stopTimerTime
value. Your use case will normally have more complex stopping conditions, but we use a simple condition here to allow us to keep the illustrated code simple. The topic of stopping conditions is discussed in more detail later.And that‘s it, let’s add our transform back into the pipeline:
// Apply a fixed window of duration 1 min and Sum the results p.apply(Create.timestamped(time_1, time_2, time_3)).apply( Window.<KV<String, Integer>>into(FixedWindows.<Integer>of(Duration.standardMinutes(1)))) // We use a combiner to reduce the number of calls in keyed state // from all elements to 1 per FixedWindow .apply(Sum.integersPerKey()) .apply(Window.into(new GlobalWindows())) .apply(ParDo.of(new LoopingStatefulTimer(Instant.parse("2000-01-01T00:04:00Z")))) .apply(Window.into(FixedWindows.of(Duration.standardMinutes(1)))) .apply(Sum.integersPerKey()) .apply(ParDo.of(new DoFn<KV<String, Integer>, KV<String, Integer>>() { @ProcessElement public void process(ProcessContext c) { LOG.info("Value is {} timestamp is {}", c.element(), c.timestamp()); } }));
This pipeline ensures that a value of zero exists for each interval window, even if the Source of the pipeline emitted a value in the minimum and maximum boundaries of the interval window. This means that we can mark the absence of data.
You might question why we use two reducers with multiple Sum.integersPerKey
. Why not just use one? Functionally, using one would also produce the correct result. However, putting two Sum.integersPerKey
gives us a nice performance advantage. It reduces the number of elements from many to just one per time interval. This can reduce the number of reads of the State API during the @ProcessElement
calls.
Here is the logging output of running our modified pipeline:
INFO LoopingTimer - Timer @ 2000-01-01T00:01:59.999Z fired INFO LoopingTimer - Timer @ 2000-01-01T00:02:59.999Z fired INFO LoopingTimer - Timer @ 2000-01-01T00:03:59.999Z fired INFO LoopingTimer - Timer not being set as exceeded Stop Timer value 2000-01-01T00:04:00.000Z INFO LoopingTimer - Value is KV{Key_A, 1} timestamp is 2000-01-01T00:00:59.999Z INFO LoopingTimer - Value is KV{Key_A, 0} timestamp is 2000-01-01T00:02:59.999Z INFO LoopingTimer - Value is KV{Key_A, 2} timestamp is 2000-01-01T00:01:59.999Z INFO LoopingTimer - Value is KV{Key_A, 3} timestamp is 2000-01-01T00:03:59.999Z
Yay! We now have output from the time interval [00:01:00, 00:01:59.999), even though the source dataset has no elements in that interval.
In this blog, we covered one of the fun areas around time series use cases and worked through several options, including an advanced use case of the Timer API. Happy looping everyone!
Note: Looping timers is an interesting new use case for the Timer API and runners will need to add support for it with all of their more advanced feature sets. You can experiment with this pattern today using the DirectRunner. For other runners, please look out for their release notes on support for dealing with this use case in production.
([Capability Matrix]({{ site.baseurl }}/documentation/runners/capability-matrix/))
Runner specific notes: Google Cloud Dataflow Runners Drain feature does not support looping timers (Link to matrix)