blob: 651c242af8ab2a3e5ad934a7677aaac8fc8e0c34 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.cookbook;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterEach;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* This example illustrates the basic concepts behind triggering. It shows how to use different
* trigger definitions to produce partial (speculative) results before all the data is processed and
* to control when updated results are produced for late data. The example performs a streaming
* analysis of the data coming in from a text file and writes the results to BigQuery. It divides
* the data into {@link Window windows} to be processed, and demonstrates using various kinds of
* {@link org.apache.beam.sdk.transforms.windowing.Trigger triggers} to control when the results for
* each window are emitted.
*
* <p>This example uses a portion of real traffic data from San Diego freeways. It contains
* readings from sensor stations set up along each freeway. Each sensor reading includes a
* calculation of the 'total flow' across all lanes in that freeway direction.
*
* <p>Concepts:
* <pre>
* 1. The default triggering behavior
* 2. Late data with the default trigger
* 3. How to get speculative estimates
* 4. Combining late data and speculative estimates
* </pre>
*
* <p>Before running this example, it will be useful to familiarize yourself with Beam triggers
* and understand the concept of 'late data',
* See: <a href="https://beam.apache.org/documentation/programming-guide/#triggers">
* https://beam.apache.org/documentation/programming-guide/#triggers</a>
*
* <p>The example is configured to use the default BigQuery table from the example common package
* (there are no defaults for a general Beam pipeline).
* You can override them by using the {@code --bigQueryDataset}, and {@code --bigQueryTable}
* options. If the BigQuery table do not exist, the example will try to create them.
*
* <p>The pipeline outputs its results to a BigQuery table.
* Here are some queries you can use to see interesting results:
* Replace {@code <enter_table_name>} in the query below with the name of the BigQuery table.
* Replace {@code <enter_window_interval>} in the query below with the window interval.
*
* <p>To see the results of the default trigger,
* Note: When you start up your pipeline, you'll initially see results from 'late' data. Wait after
* the window duration, until the first pane of non-late data has been emitted, to see more
* interesting results.
* {@code SELECT * FROM enter_table_name WHERE trigger_type = "default" ORDER BY window DESC}
*
* <p>To see the late data i.e. dropped by the default trigger,
* {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "withAllowedLateness" and
* (timing = "LATE" or timing = "ON_TIME") and freeway = "5" ORDER BY window DESC, processing_time}
*
* <p>To see the the difference between accumulation mode and discarding mode,
* {@code SELECT * FROM <enter_table_name> WHERE (timing = "LATE" or timing = "ON_TIME") AND
* (trigger_type = "withAllowedLateness" or trigger_type = "sequential") and freeway = "5" ORDER BY
* window DESC, processing_time}
*
* <p>To see speculative results every minute,
* {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "speculative" and freeway = "5"
* ORDER BY window DESC, processing_time}
*
* <p>To see speculative results every five minutes after the end of the window
* {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "sequential" and timing != "EARLY"
* and freeway = "5" ORDER BY window DESC, processing_time}
*
* <p>To see the first and the last pane for a freeway in a window for all the trigger types,
* {@code SELECT * FROM <enter_table_name> WHERE (isFirst = true or isLast = true) ORDER BY window}
*
* <p>To reduce the number of results for each query we can add additional where clauses.
* For examples, To see the results of the default trigger,
* {@code SELECT * FROM <enter_table_name> WHERE trigger_type = "default" AND freeway = "5" AND
* window = "<enter_window_interval>"}
*
* <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
* and then exits.
*/
public class TriggerExample {
//Numeric value of fixed window duration, in minutes
public static final int WINDOW_DURATION = 30;
// Constants used in triggers.
// Speeding up ONE_MINUTE or FIVE_MINUTES helps you get an early approximation of results.
// ONE_MINUTE is used only with processing time before the end of the window
public static final Duration ONE_MINUTE = Duration.standardMinutes(1);
// FIVE_MINUTES is used only with processing time after the end of the window
public static final Duration FIVE_MINUTES = Duration.standardMinutes(5);
// ONE_DAY is used to specify the amount of lateness allowed for the data elements.
public static final Duration ONE_DAY = Duration.standardDays(1);
/**
* This transform demonstrates using triggers to control when data is produced for each window
* Consider an example to understand the results generated by each type of trigger.
* The example uses "freeway" as the key. Event time is the timestamp associated with the data
* element and processing time is the time when the data element gets processed in the pipeline.
* For freeway 5, suppose there are 10 elements in the [10:00:00, 10:30:00) window.
* Key (freeway) | Value (total_flow) | event time | processing time
* 5 | 50 | 10:00:03 | 10:00:47
* 5 | 30 | 10:01:00 | 10:01:03
* 5 | 30 | 10:02:00 | 11:07:00
* 5 | 20 | 10:04:10 | 10:05:15
* 5 | 60 | 10:05:00 | 11:03:00
* 5 | 20 | 10:05:01 | 11.07:30
* 5 | 60 | 10:15:00 | 10:27:15
* 5 | 40 | 10:26:40 | 10:26:43
* 5 | 60 | 10:27:20 | 10:27:25
* 5 | 60 | 10:29:00 | 11:11:00
*
* <p>Beam tracks a watermark which records up to what point in event time the data is
* complete. For the purposes of the example, we'll assume the watermark is approximately 15m
* behind the current processing time. In practice, the actual value would vary over time based
* on the systems knowledge of the current delay and contents of the backlog (data
* that has not yet been processed).
*
* <p>If the watermark is 15m behind, then the window [10:00:00, 10:30:00) (in event time) would
* close at 10:44:59, when the watermark passes 10:30:00.
*/
static class CalculateTotalFlow
extends PTransform <PCollection<KV<String, Integer>>, PCollectionList<TableRow>> {
private int windowDuration;
CalculateTotalFlow(int windowDuration) {
this.windowDuration = windowDuration;
}
@Override
public PCollectionList<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
// Concept #1: The default triggering behavior
// By default Beam uses a trigger which fires when the watermark has passed the end of the
// window. This would be written {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
// The system also defaults to dropping late data -- data which arrives after the watermark
// has passed the event timestamp of the arriving element. This means that the default trigger
// will only fire once.
// Each pane produced by the default trigger with no allowed lateness will be the first and
// last pane in the window, and will be ON_TIME.
// The results for the example above with the default trigger and zero allowed lateness
// would be:
// Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
// 5 | 260 | 6 | true | true | ON_TIME
// At 11:03:00 (processing time) the system watermark may have advanced to 10:54:00. As a
// result, when the data record with event time 10:05:00 arrives at 11:03:00, it is considered
// late, and dropped.
PCollection<TableRow> defaultTriggerResults = flowInfo
.apply("Default", Window
// The default window duration values work well if you're running the default input
// file. You may want to adjust the window duration otherwise.
.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
// The default trigger first emits output when the system's watermark passes the end
// of the window.
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
// Late data is dropped
.withAllowedLateness(Duration.ZERO)
// Discard elements after emitting each pane.
// With no allowed lateness and the specified trigger there will only be a single
// pane, so this doesn't have a noticeable effect. See concept 2 for more details.
.discardingFiredPanes())
.apply(new TotalFlow("default"));
// Concept #2: Late data with the default trigger
// This uses the same trigger as concept #1, but allows data that is up to ONE_DAY late. This
// leads to each window staying open for ONE_DAY after the watermark has passed the end of the
// window. Any late data will result in an additional pane being fired for that same window.
// The first pane produced will be ON_TIME and the remaining panes will be LATE.
// To definitely get the last pane when the window closes, use
// .withAllowedLateness(ONE_DAY, ClosingBehavior.FIRE_ALWAYS).
// The results for the example above with the default trigger and ONE_DAY allowed lateness
// would be:
// Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
// 5 | 260 | 6 | true | false | ON_TIME
// 5 | 60 | 1 | false | false | LATE
// 5 | 30 | 1 | false | false | LATE
// 5 | 20 | 1 | false | false | LATE
// 5 | 60 | 1 | false | false | LATE
PCollection<TableRow> withAllowedLatenessResults = flowInfo
.apply("WithLateData", Window
.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
// Late data is emitted as it arrives
.triggering(Repeatedly.forever(AfterWatermark.pastEndOfWindow()))
// Once the output is produced, the pane is dropped and we start preparing the next
// pane for the window
.discardingFiredPanes()
// Late data is handled up to one day
.withAllowedLateness(ONE_DAY))
.apply(new TotalFlow("withAllowedLateness"));
// Concept #3: How to get speculative estimates
// We can specify a trigger that fires independent of the watermark, for instance after
// ONE_MINUTE of processing time. This allows us to produce speculative estimates before
// all the data is available. Since we don't have any triggers that depend on the watermark
// we don't get an ON_TIME firing. Instead, all panes are either EARLY or LATE.
// We also use accumulatingFiredPanes to build up the results across each pane firing.
// The results for the example above for this trigger would be:
// Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
// 5 | 80 | 2 | true | false | EARLY
// 5 | 100 | 3 | false | false | EARLY
// 5 | 260 | 6 | false | false | EARLY
// 5 | 320 | 7 | false | false | LATE
// 5 | 370 | 9 | false | false | LATE
// 5 | 430 | 10 | false | false | LATE
PCollection<TableRow> speculativeResults = flowInfo
.apply("Speculative" , Window
.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
// Trigger fires every minute.
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
// Speculative every ONE_MINUTE
.plusDelayOf(ONE_MINUTE)))
// After emitting each pane, it will continue accumulating the elements so that each
// approximation includes all of the previous data in addition to the newly arrived
// data.
.accumulatingFiredPanes()
.withAllowedLateness(ONE_DAY))
.apply(new TotalFlow("speculative"));
// Concept #4: Combining late data and speculative estimates
// We can put the previous concepts together to get EARLY estimates, an ON_TIME result,
// and LATE updates based on late data.
// Each time a triggering condition is satisfied it advances to the next trigger.
// If there are new elements this trigger emits a window under following condition:
// > Early approximations every minute till the end of the window.
// > An on-time firing when the watermark has passed the end of the window
// > Every five minutes of late data.
// Every pane produced will either be EARLY, ON_TIME or LATE.
// The results for the example above for this trigger would be:
// Key (freeway) | Value (total_flow) | number_of_records | isFirst | isLast | timing
// 5 | 80 | 2 | true | false | EARLY
// 5 | 100 | 3 | false | false | EARLY
// 5 | 260 | 6 | false | false | EARLY
// [First pane fired after the end of the window]
// 5 | 320 | 7 | false | false | ON_TIME
// 5 | 430 | 10 | false | false | LATE
// For more possibilities of how to build advanced triggers, see {@link Trigger}.
PCollection<TableRow> sequentialResults = flowInfo
.apply("Sequential", Window
.<KV<String, Integer>>into(FixedWindows.of(Duration.standardMinutes(windowDuration)))
.triggering(AfterEach.inOrder(
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
// Speculative every ONE_MINUTE
.plusDelayOf(ONE_MINUTE)).orFinally(AfterWatermark.pastEndOfWindow()),
Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
// Late data every FIVE_MINUTES
.plusDelayOf(FIVE_MINUTES))))
.accumulatingFiredPanes()
// For up to ONE_DAY
.withAllowedLateness(ONE_DAY))
.apply(new TotalFlow("sequential"));
// Adds the results generated by each trigger type to a PCollectionList.
PCollectionList<TableRow> resultsList = PCollectionList.of(defaultTriggerResults)
.and(withAllowedLatenessResults)
.and(speculativeResults)
.and(sequentialResults);
return resultsList;
}
}
//////////////////////////////////////////////////////////////////////////////////////////////////
// The remaining parts of the pipeline are needed to produce the output for each
// concept above. Not directly relevant to understanding the trigger examples.
/**
* Calculate total flow and number of records for each freeway and format the results to TableRow
* objects, to save to BigQuery.
*/
static class TotalFlow extends
PTransform <PCollection<KV<String, Integer>>, PCollection<TableRow>> {
private String triggerType;
public TotalFlow(String triggerType) {
this.triggerType = triggerType;
}
@Override
public PCollection<TableRow> expand(PCollection<KV<String, Integer>> flowInfo) {
PCollection<KV<String, Iterable<Integer>>> flowPerFreeway = flowInfo
.apply(GroupByKey.<String, Integer>create());
PCollection<KV<String, String>> results = flowPerFreeway.apply(ParDo.of(
new DoFn<KV<String, Iterable<Integer>>, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Iterable<Integer> flows = c.element().getValue();
Integer sum = 0;
Long numberOfRecords = 0L;
for (Integer value : flows) {
sum += value;
numberOfRecords++;
}
c.output(KV.of(c.element().getKey(), sum + "," + numberOfRecords));
}
}));
PCollection<TableRow> output = results.apply(ParDo.of(new FormatTotalFlow(triggerType)));
return output;
}
}
/**
* Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
* Adds the triggerType, pane information, processing time and the window timestamp.
* */
static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow> {
private String triggerType;
public FormatTotalFlow(String triggerType) {
this.triggerType = triggerType;
}
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
String[] values = c.element().getValue().split(",");
TableRow row = new TableRow()
.set("trigger_type", triggerType)
.set("freeway", c.element().getKey())
.set("total_flow", Integer.parseInt(values[0]))
.set("number_of_records", Long.parseLong(values[1]))
.set("window", window.toString())
.set("isFirst", c.pane().isFirst())
.set("isLast", c.pane().isLast())
.set("timing", c.pane().getTiming().toString())
.set("event_time", c.timestamp().toString())
.set("processing_time", Instant.now().toString());
c.output(row);
}
}
/**
* Extract the freeway and total flow in a reading.
* Freeway is used as key since we are calculating the total flow for each freeway.
*/
static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
String[] laneInfo = c.element().split(",");
if (laneInfo[0].equals("timestamp")) {
// Header row
return;
}
if (laneInfo.length < 48) {
//Skip the invalid input.
return;
}
String freeway = laneInfo[2];
Integer totalFlow = tryIntegerParse(laneInfo[7]);
// Ignore the records with total flow 0 to easily understand the working of triggers.
// Skip the records with total flow -1 since they are invalid input.
if (totalFlow == null || totalFlow <= 0) {
return;
}
c.output(KV.of(freeway, totalFlow));
}
}
/**
* Inherits standard configuration options.
*/
public interface TrafficFlowOptions
extends ExampleOptions, ExampleBigQueryTableOptions, StreamingOptions {
@Description("Input file to read from")
@Default.String("gs://apache-beam-samples/traffic_sensor/"
+ "Freeways-5Minaa2010-01-01_to_2010-02-15.csv")
String getInput();
void setInput(String value);
@Description("Numeric value of window duration for fixed windows, in minutes")
@Default.Integer(WINDOW_DURATION)
Integer getWindowDuration();
void setWindowDuration(Integer value);
}
public static void main(String[] args) throws Exception {
TrafficFlowOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TrafficFlowOptions.class);
options.setStreaming(true);
options.setBigQuerySchema(getSchema());
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = getTableReference(options.getProject(),
options.getBigQueryDataset(), options.getBigQueryTable());
PCollectionList<TableRow> resultList = pipeline
.apply("ReadMyFile", TextIO.read().from(options.getInput()))
.apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
.apply(ParDo.of(new ExtractFlowInfo()))
.apply(new CalculateTotalFlow(options.getWindowDuration()));
for (int i = 0; i < resultList.size(); i++){
resultList.get(i).apply(BigQueryIO.writeTableRows()
.to(tableRef)
.withSchema(getSchema()));
}
PipelineResult result = pipeline.run();
// ExampleUtils will try to cancel the pipeline and the injector before the program exits.
exampleUtils.waitToFinish(result);
}
/**
* Add current time to each record.
* Also insert a delay at random to demo the triggers.
*/
public static class InsertDelays extends DoFn<String, String> {
private static final double THRESHOLD = 0.001;
// MIN_DELAY and MAX_DELAY in minutes.
private static final int MIN_DELAY = 1;
private static final int MAX_DELAY = 100;
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
Instant timestamp = Instant.now();
Random random = new Random();
if (random.nextDouble() < THRESHOLD){
int range = MAX_DELAY - MIN_DELAY;
int delayInMinutes = random.nextInt(range) + MIN_DELAY;
long delayInMillis = TimeUnit.MINUTES.toMillis(delayInMinutes);
timestamp = new Instant(timestamp.getMillis() - delayInMillis);
}
c.outputWithTimestamp(c.element(), timestamp);
}
}
/** Sets the table reference. */
private static TableReference getTableReference(String project, String dataset, String table){
TableReference tableRef = new TableReference();
tableRef.setProjectId(project);
tableRef.setDatasetId(dataset);
tableRef.setTableId(table);
return tableRef;
}
/** Defines the BigQuery schema used for the output. */
private static TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("trigger_type").setType("STRING"));
fields.add(new TableFieldSchema().setName("freeway").setType("STRING"));
fields.add(new TableFieldSchema().setName("total_flow").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("number_of_records").setType("INTEGER"));
fields.add(new TableFieldSchema().setName("window").setType("STRING"));
fields.add(new TableFieldSchema().setName("isFirst").setType("BOOLEAN"));
fields.add(new TableFieldSchema().setName("isLast").setType("BOOLEAN"));
fields.add(new TableFieldSchema().setName("timing").setType("STRING"));
fields.add(new TableFieldSchema().setName("event_time").setType("TIMESTAMP"));
fields.add(new TableFieldSchema().setName("processing_time").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
private static Integer tryIntegerParse(String number) {
try {
return Integer.parseInt(number);
} catch (NumberFormatException e) {
return null;
}
}
}