blob: 962faa504916031b42a2e8e0bb0845197f582f16 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.stream.sample.cookbook;
import java.util.Date;
import java.util.Objects;
import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
import com.datatorrent.lib.util.KeyValPair;
/**
* This example illustrates the basic concepts behind triggering. It shows how to use different
* trigger definitions to produce partial (speculative) results before all the data is processed and
* to control when updated results are produced for late data. The example performs a streaming
* analysis of the data coming in from PubSub and writes the results to BigQuery. It divides the
* data into {@link Window windows} to be processed, and demonstrates using various kinds of
* {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for
* each window are emitted.
*
* <p> This example uses a portion of real traffic data from San Diego freeways. It contains
* readings from sensor stations set up along each freeway. Each sensor reading includes a
* calculation of the 'total flow' across all lanes in that freeway direction.
*
* <p> Concepts:
* <pre>
* 1. The default triggering behavior
* 2. Late data with the default trigger
* 3. How to get speculative estimates
* 4. Combining late data and speculative estimates
* </pre>
*
* <p> Before running this example, it will be useful to familiarize yourself with Dataflow triggers
* and understand the concept of 'late data',
* See: <a href="https://cloud.google.com/dataflow/model/triggers">
* https://cloud.google.com/dataflow/model/triggers </a> and
* <a href="https://cloud.google.com/dataflow/model/windowing#Advanced">
* https://cloud.google.com/dataflow/model/windowing#Advanced </a>
*
* <p> The example pipeline reads data from a Pub/Sub topic. By default, running the example will
* also run an auxiliary pipeline to inject data from the default {@code --input} file to the
* {@code --pubsubTopic}. The auxiliary pipeline puts a timestamp on the injected data so that the
* example pipeline can operate on <i>event time</i> (rather than arrival time). The auxiliary
* pipeline also randomly simulates late data, by setting the timestamps of some of the data
* elements to be in the past. You may override the default {@code --input} with the file of your
* choosing or set {@code --input=""} which will disable the automatic Pub/Sub injection, and allow
* you to use a separate tool to publish to the given topic.
*
* <p> The example is configured to use the default Pub/Sub topic and the default BigQuery table
* from the example common package (there are no defaults for a general Dataflow pipeline).
* You can override them by using the {@code --pubsubTopic}, {@code --bigQueryDataset}, and
* {@code --bigQueryTable} options. If the Pub/Sub topic or the BigQuery table do not exist,
* the example will try to create them.
*
* <p> The pipeline outputs its results to a BigQuery table.
* Here are some queries you can use to see interesting results:
* Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table.
* Replace {@code <enter_window_interval>} in the query below with the window interval.
*
* <p> To see the results of the default trigger,
* Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
* the window duration, until the first pane of non-late data has been emitted, to see more
* interesting results.
* {@code SELECT * FROM enter_table_name WHERE triggerType = "default" ORDER BY window DESC}
*
* <p> To see the late data i.e. dropped by the default trigger,
* {@code SELECT * FROM <enter_table_name> WHERE triggerType = "withAllowedLateness" and
* (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processingTime}
*
* <p>To see the the difference between accumulation mode and discarding mode,
* {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
* (triggerType = "withAllowedLateness" or triggerType = "sequential") and freeway = "5" ORDER BY
* window DESC, processingTime}
*
* <p> To see speculative results every minute,
* {@code SELECT * FROM <enter_table_name> WHERE triggerType = "speculative" and freeway = "5"
* ORDER BY window DESC, processingTime}
*
* <p> To see speculative results every five minutes after the end of the window
* {@code SELECT * FROM <enter_table_name> WHERE triggerType = "sequential" and timing != "EARLY"
* and freeway = "5" ORDER BY window DESC, processingTime}
*
* <p> To see the first and the last pane for a freeway in a window for all the trigger types,
* {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
*
* <p> To reduce the number of results for each query we can add additional where clauses.
* For examples, To see the results of the default trigger,
* {@code SELECT * FROM <enter_table_name> WHERE triggerType = "default" AND freeway = "5" AND
* window = "<enter_window_interval>"}
*
* <p> The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
* and then exits.
*
* @since 3.5.0
*/
public class TriggerExample
{
//Numeric value of fixed window duration, in minutes
public static final int WINDOW_DURATION = 30;
// Constants used in triggers.
// Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
// ONE_MINUTE is used only with processing time before the end of the window
public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
// FIVE_MINUTES is used only with processing time after the end of the window
public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
// ONE_DAY is used to specify the amount of lateness allowed for the data elements.
public static final Duration ONE_DAY = Duration.standardDays(1);
/**
* This transform demonstrates using triggers to control when data is produced for each window
* Consider an example to understand the results generated by each type of trigger.
* The example uses "freeway" as the key. Event time is the timestamp associated with the data
* element and processing time is the time when the data element gets processed in the pipeline.
* For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
* Key (freeway) | Value (totalFlow) | event time | processing time
* 5 | 50 | 10:00:03 | 10:00:47
* 5 | 30 | 10:01:00 | 10:01:03
* 5 | 30 | 10:02:00 | 11:07:00
* 5 | 20 | 10:04:10 | 10:05:15
* 5 | 60 | 10:05:00 | 11:03:00
* 5 | 20 | 10:05:01 | 11.07:30
* 5 | 60 | 10:15:00 | 10:27:15
* 5 | 40 | 10:26:40 | 10:26:43
* 5 | 60 | 10:27:20 | 10:27:25
* 5 | 60 | 10:29:00 | 11:11:00
*
* <p> Dataflow tracks a watermark which records up to what point in event time the data is
* complete. For the purposes of the example, we'll assume the watermark is approximately 15m
* behind the current processing time. In practice, the actual value would vary over time based
* on the systems knowledge of the current PubSub delay and contents of the backlog (data
* that has not yet been processed).
*
* <p> If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
* close at 10:44:59, when the watermark passes 10:30:00.
*/
static class CalculateTotalFlow
extends CompositeStreamTransform<ApexStream<String>, WindowedStream<SampleBean>>
{
private int windowDuration;
CalculateTotalFlow(int windowDuration)
{
this.windowDuration = windowDuration;
}
@Override
public WindowedStream<SampleBean> compose(ApexStream<String> inputStream)
{
// Concept #1: The default triggering behavior
// By default Dataflow uses a trigger which fires when the watermark has passed the end of the
// window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
// The system also defaults to dropping late data -- data which arrives after the watermark
// has passed the event timestamp of the arriving element. This means that the default trigger
// will only fire once.
// Each pane produced by the default trigger with no allowed lateness will be the first and
// last pane in the window, and will be ON_TIME.
// The results for the example above with the default trigger and zero allowed lateness
// would be:
// Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
// 5 | 260 | 6 | true | true | ON_TIME
// At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
// result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
// late, and dropped.
WindowedStream<SampleBean> defaultTriggerResults = inputStream
.window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
new TriggerOption().discardingFiredPanes())
.addCompositeStreams(new TotalFlow("default"));
// Concept #2: Late data with the default trigger
// This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
// leads to each window staying open for ONE_DAY after the watermark has passed the end of the
// window. Any late data will result in an additional pane being fired for that same window.
// The first pane produced will be ON_TIME and the remaining panes will be LATE.
// To definitely get the last pane when the window closes, use
// .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
// The results for the example above with the default trigger and ONE_DAY allowed lateness
// would be:
// Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
// 5 | 260 | 6 | true | false | ON_TIME
// 5 | 60 | 1 | false | false | LATE
// 5 | 30 | 1 | false | false | LATE
// 5 | 20 | 1 | false | false | LATE
// 5 | 60 | 1 | false | false | LATE
WindowedStream<SampleBean> withAllowedLatenessResults = inputStream
.window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
new TriggerOption().discardingFiredPanes(),
Duration.standardDays(1))
.addCompositeStreams(new TotalFlow("withAllowedLateness"));
// Concept #3: How to get speculative estimates
// We can specify a trigger that fires independent of the watermark, for instance after
// ONE_MINUTE of processing time. This allows us to produce speculative estimates before
// all the data is available. Since we don't have any triggers that depend on the watermark
// we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
// We also use accumulatingFiredPanes to build up the results across each pane firing.
// The results for the example above for this trigger would be:
// Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
// 5 | 80 | 2 | true | false | EARLY
// 5 | 100 | 3 | false | false | EARLY
// 5 | 260 | 6 | false | false | EARLY
// 5 | 320 | 7 | false | false | LATE
// 5 | 370 | 9 | false | false | LATE
// 5 | 430 | 10 | false | false | LATE
ApexStream<SampleBean> speculativeResults = inputStream
.window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
//Trigger fires every minute
new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
// After emitting each pane, it will continue accumulating the elements so that each
// approximation includes all of the previous data in addition to the newly arrived
// data.
.accumulatingFiredPanes(),
Duration.standardDays(1))
.addCompositeStreams(new TotalFlow("speculative"));
// Concept #4: Combining late data and speculative estimates
// We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
// and LATE updates based on late data.
// Each time a triggering condition is satisfied it advances to the next trigger.
// If there are new elements this trigger emits a window under following condition:
// > Early approximations every minute till the end of the window.
// > An on-time firing when the watermark has passed the end of the window
// > Every five minutes of late data.
// Every pane produced will either be EARLY, ON_TIME or LATE.
// The results for the example above for this trigger would be:
// Key (freeway) | Value (totalFlow) | numberOfRecords | isFirst | isLast | timing
// 5 | 80 | 2 | true | false | EARLY
// 5 | 100 | 3 | false | false | EARLY
// 5 | 260 | 6 | false | false | EARLY
// [First pane fired after the end of the window]
// 5 | 320 | 7 | false | false | ON_TIME
// 5 | 430 | 10 | false | false | LATE
// For more possibilities of how to build advanced triggers, see {@link Trigger}.
WindowedStream<SampleBean> sequentialResults = inputStream
.window(new WindowOption.TimeWindows(Duration.standardMinutes(windowDuration)),
// Speculative every ONE_MINUTE
new TriggerOption().withEarlyFiringsAtEvery(Duration.standardMinutes(1))
.withLateFiringsAtEvery(Duration.standardMinutes(5))
// After emitting each pane, it will continue accumulating the elements so that each
// approximation includes all of the previous data in addition to the newly arrived
// data.
.accumulatingFiredPanes(),
Duration.standardDays(1))
.addCompositeStreams(new TotalFlow("sequential"));
return sequentialResults;
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////
// The remaining parts of the pipeline are needed to produce the output for each
// concept above. Not directly relevant to understanding the trigger examples.
/**
* Calculate total flow and number of records for each freeway and format the results to TableRow
* objects, to save to BigQuery.
*/
static class TotalFlow extends
CompositeStreamTransform<WindowedStream<String>, WindowedStream<SampleBean>>
{
private String triggerType;
public TotalFlow(String triggerType)
{
this.triggerType = triggerType;
}
@Override
public WindowedStream<SampleBean> compose(WindowedStream<String> inputStream)
{
WindowedStream<KeyValPair<String, Iterable<Integer>>> flowPerFreeway = inputStream
.groupByKey(new ExtractFlowInfo());
return flowPerFreeway
.map(new Function.MapFunction<KeyValPair<String, Iterable<Integer>>, KeyValPair<String, String>>()
{
@Override
public KeyValPair<String, String> f(KeyValPair<String, Iterable<Integer>> input)
{
Iterable<Integer> flows = input.getValue();
Integer sum = 0;
Long numberOfRecords = 0L;
for (Integer value : flows) {
sum += value;
numberOfRecords++;
}
return new KeyValPair<>(input.getKey(), sum + "," + numberOfRecords);
}
})
.map(new FormatTotalFlow(triggerType));
}
}
/**
* Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
* Adds the triggerType, pane information, processing time and the window timestamp.
*/
static class FormatTotalFlow implements Function.MapFunction<KeyValPair<String, String>, SampleBean>
{
private String triggerType;
public FormatTotalFlow(String triggerType)
{
this.triggerType = triggerType;
}
@Override
public SampleBean f(KeyValPair<String, String> input)
{
String[] values = input.getValue().split(",");
//TODO need to have a callback to get the metadata like window id, pane id, timestamps etc.
return new SampleBean(triggerType, input.getKey(), Integer.parseInt(values[0]), Long
.parseLong(values[1]), null, false, false, null, null, new Date());
}
}
public static class SampleBean
{
public SampleBean()
{
}
private String triggerType;
private String freeway;
private int totalFlow;
private long numberOfRecords;
private String window;
private boolean isFirst;
private boolean isLast;
private Date timing;
private Date eventTime;
private Date processingTime;
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
SampleBean that = (SampleBean)o;
return totalFlow == that.totalFlow &&
numberOfRecords == that.numberOfRecords &&
isFirst == that.isFirst &&
isLast == that.isLast &&
Objects.equals(triggerType, that.triggerType) &&
Objects.equals(freeway, that.freeway) &&
Objects.equals(window, that.window) &&
Objects.equals(timing, that.timing) &&
Objects.equals(eventTime, that.eventTime) &&
Objects.equals(processingTime, that.processingTime);
}
@Override
public int hashCode()
{
return Objects
.hash(triggerType, freeway, totalFlow, numberOfRecords, window, isFirst, isLast, timing, eventTime,
processingTime);
}
public SampleBean(String triggerType, String freeway, int totalFlow, long numberOfRecords, String window, boolean isFirst, boolean isLast, Date timing, Date eventTime, Date processingTime)
{
this.triggerType = triggerType;
this.freeway = freeway;
this.totalFlow = totalFlow;
this.numberOfRecords = numberOfRecords;
this.window = window;
this.isFirst = isFirst;
this.isLast = isLast;
this.timing = timing;
this.eventTime = eventTime;
this.processingTime = processingTime;
}
public String getTriggerType()
{
return triggerType;
}
public void setTriggerType(String triggerType)
{
this.triggerType = triggerType;
}
public String getFreeway()
{
return freeway;
}
public void setFreeway(String freeway)
{
this.freeway = freeway;
}
public int getTotalFlow()
{
return totalFlow;
}
public void setTotalFlow(int totalFlow)
{
this.totalFlow = totalFlow;
}
public long getNumberOfRecords()
{
return numberOfRecords;
}
public void setNumberOfRecords(long numberOfRecords)
{
this.numberOfRecords = numberOfRecords;
}
public String getWindow()
{
return window;
}
public void setWindow(String window)
{
this.window = window;
}
public boolean isFirst()
{
return isFirst;
}
public void setFirst(boolean first)
{
isFirst = first;
}
public boolean isLast()
{
return isLast;
}
public void setLast(boolean last)
{
isLast = last;
}
public Date getTiming()
{
return timing;
}
public void setTiming(Date timing)
{
this.timing = timing;
}
public Date getEventTime()
{
return eventTime;
}
public void setEventTime(Date eventTime)
{
this.eventTime = eventTime;
}
public Date getProcessingTime()
{
return processingTime;
}
public void setProcessingTime(Date processingTime)
{
this.processingTime = processingTime;
}
}
/**
* Extract the freeway and total flow in a reading.
* Freeway is used as key since we are calculating the total flow for each freeway.
*/
static class ExtractFlowInfo implements Function.ToKeyValue<String, String, Integer>
{
@Override
public Tuple<KeyValPair<String, Integer>> f(String input)
{
String[] laneInfo = input.split(",");
if (laneInfo[0].equals("timestamp")) {
// Header row
return null;
}
if (laneInfo.length < 48) {
//Skip the invalid input.
return null;
}
String freeway = laneInfo[2];
Integer totalFlow = tryIntegerParse(laneInfo[7]);
// Ignore the records with total flow 0 to easily understand the working of triggers.
// Skip the records with total flow -1 since they are invalid input.
if (totalFlow == null || totalFlow <= 0) {
return null;
}
return new Tuple.PlainTuple<>(new KeyValPair<>(freeway, totalFlow));
}
}
private static final String PUBSUB_TIMESTAMP_LABEL_KEY = "timestamp_ms";
public static void main(String[] args) throws Exception
{
StreamFactory.fromFolder("some folder")
.addCompositeStreams(new CalculateTotalFlow(60));
}
private static Integer tryIntegerParse(String number)
{
try {
return Integer.parseInt(number);
} catch (NumberFormatException e) {
return null;
}
}
}