blob: c9ba18c1b5fae8162ae23fa86fa8e88f45398212 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import org.apache.avro.reflect.Nullable;
import org.apache.beam.examples.common.ExampleBigQueryTableOptions;
import org.apache.beam.examples.common.ExampleOptions;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
/**
* A Beam Example that runs in both batch and streaming modes with traffic sensor data.
* You can configure the running mode by setting {@literal --streaming} to true or false.
*
* <p>Concepts: The batch and streaming runners, GroupByKey, sliding windows.
*
* <p>This example analyzes traffic sensor data using SlidingWindows. For each window,
* it calculates the average speed over the window for some small set of predefined 'routes',
* and looks for 'slowdowns' in those routes. It writes its results to a BigQuery table.
*
* <p>The pipeline reads traffic sensor data from {@literal --inputFile}.
*
* <p>The example is configured to use the default BigQuery table from the example common package
* (there are no defaults for a general Beam pipeline).
* You can override them by using the {@literal --bigQueryDataset}, and {@literal --bigQueryTable}
* options. If the BigQuery table do not exist, the example will try to create them.
*
* <p>The example will try to cancel the pipelines on the signal to terminate the process (CTRL-C)
* and then exits.
*/
public class TrafficRoutes {
// Instantiate some small predefined San Diego routes to analyze
static Map<String, String> sdStations = buildStationInfo();
static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes
static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes
/**
* This class holds information about a station reading's average speed.
*/
@DefaultCoder(AvroCoder.class)
static class StationSpeed implements Comparable<StationSpeed> {
@Nullable String stationId;
@Nullable Double avgSpeed;
@Nullable Long timestamp;
public StationSpeed() {}
public StationSpeed(String stationId, Double avgSpeed, Long timestamp) {
this.stationId = stationId;
this.avgSpeed = avgSpeed;
this.timestamp = timestamp;
}
public String getStationId() {
return this.stationId;
}
public Double getAvgSpeed() {
return this.avgSpeed;
}
@Override
public int compareTo(StationSpeed other) {
return Long.compare(this.timestamp, other.timestamp);
}
}
/**
* This class holds information about a route's speed/slowdown.
*/
@DefaultCoder(AvroCoder.class)
static class RouteInfo {
@Nullable String route;
@Nullable Double avgSpeed;
@Nullable Boolean slowdownEvent;
public RouteInfo() {}
public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent) {
this.route = route;
this.avgSpeed = avgSpeed;
this.slowdownEvent = slowdownEvent;
}
public String getRoute() {
return this.route;
}
public Double getAvgSpeed() {
return this.avgSpeed;
}
public Boolean getSlowdownEvent() {
return this.slowdownEvent;
}
}
/**
* Extract the timestamp field from the input string, and use it as the element timestamp.
*/
static class ExtractTimestamps extends DoFn<String, String> {
private static final DateTimeFormatter dateTimeFormat =
DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
@ProcessElement
public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
String[] items = c.element().split(",");
String timestamp = tryParseTimestamp(items);
if (timestamp != null) {
try {
c.outputWithTimestamp(c.element(), new Instant(dateTimeFormat.parseMillis(timestamp)));
} catch (IllegalArgumentException e) {
// Skip the invalid input.
}
}
}
}
/**
* Filter out readings for the stations along predefined 'routes', and output
* (station, speed info) keyed on route.
*/
static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> {
@ProcessElement
public void processElement(ProcessContext c) {
String[] items = c.element().split(",");
String stationType = tryParseStationType(items);
// For this analysis, use only 'main line' station types
if (stationType != null && stationType.equals("ML")) {
Double avgSpeed = tryParseAvgSpeed(items);
String stationId = tryParseStationId(items);
// For this simple example, filter out everything but some hardwired routes.
if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
StationSpeed stationSpeed =
new StationSpeed(stationId, avgSpeed, c.timestamp().getMillis());
// The tuple key is the 'route' name stored in the 'sdStations' hash.
KV<String, StationSpeed> outputValue = KV.of(sdStations.get(stationId), stationSpeed);
c.output(outputValue);
}
}
}
}
/**
* For a given route, track average speed for the window. Calculate whether
* traffic is currently slowing down, via a predefined threshold. If a supermajority of
* speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
* Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
*/
static class GatherStats
extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
@ProcessElement
public void processElement(ProcessContext c) throws IOException {
String route = c.element().getKey();
double speedSum = 0.0;
int speedCount = 0;
int speedups = 0;
int slowdowns = 0;
List<StationSpeed> infoList = Lists.newArrayList(c.element().getValue());
// StationSpeeds sort by embedded timestamp.
Collections.sort(infoList);
Map<String, Double> prevSpeeds = new HashMap<>();
// For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
for (StationSpeed item : infoList) {
Double speed = item.getAvgSpeed();
if (speed != null) {
speedSum += speed;
speedCount++;
Double lastSpeed = prevSpeeds.get(item.getStationId());
if (lastSpeed != null) {
if (lastSpeed < speed) {
speedups += 1;
} else {
slowdowns += 1;
}
}
prevSpeeds.put(item.getStationId(), speed);
}
}
if (speedCount == 0) {
// No average to compute.
return;
}
double speedAvg = speedSum / speedCount;
boolean slowdownEvent = slowdowns >= 2 * speedups;
RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
c.output(KV.of(route, routeInfo));
}
}
/**
* Format the results of the slowdown calculations to a TableRow, to save to BigQuery.
*/
static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> {
@ProcessElement
public void processElement(ProcessContext c) {
RouteInfo routeInfo = c.element().getValue();
TableRow row = new TableRow()
.set("avg_speed", routeInfo.getAvgSpeed())
.set("slowdown_event", routeInfo.getSlowdownEvent())
.set("route", c.element().getKey())
.set("window_timestamp", c.timestamp().toString());
c.output(row);
}
/**
* Defines the BigQuery schema used for the output.
*/
static TableSchema getSchema() {
List<TableFieldSchema> fields = new ArrayList<>();
fields.add(new TableFieldSchema().setName("route").setType("STRING"));
fields.add(new TableFieldSchema().setName("avg_speed").setType("FLOAT"));
fields.add(new TableFieldSchema().setName("slowdown_event").setType("BOOLEAN"));
fields.add(new TableFieldSchema().setName("window_timestamp").setType("TIMESTAMP"));
TableSchema schema = new TableSchema().setFields(fields);
return schema;
}
}
/**
* This PTransform extracts speed info from traffic station readings.
* It groups the readings by 'route' and analyzes traffic slowdown for that route.
* Lastly, it formats the results for BigQuery.
*/
static class TrackSpeed extends
PTransform<PCollection<KV<String, StationSpeed>>, PCollection<TableRow>> {
@Override
public PCollection<TableRow> expand(PCollection<KV<String, StationSpeed>> stationSpeed) {
// Apply a GroupByKey transform to collect a list of all station
// readings for a given route.
PCollection<KV<String, Iterable<StationSpeed>>> timeGroup = stationSpeed.apply(
GroupByKey.<String, StationSpeed>create());
// Analyze 'slowdown' over the route readings.
PCollection<KV<String, RouteInfo>> stats = timeGroup.apply(ParDo.of(new GatherStats()));
// Format the results for writing to BigQuery
PCollection<TableRow> results = stats.apply(
ParDo.of(new FormatStatsFn()));
return results;
}
}
static class ReadFileAndExtractTimestamps extends PTransform<PBegin, PCollection<String>> {
private final String inputFile;
public ReadFileAndExtractTimestamps(String inputFile) {
this.inputFile = inputFile;
}
@Override
public PCollection<String> expand(PBegin begin) {
return begin
.apply(TextIO.read().from(inputFile))
.apply(ParDo.of(new ExtractTimestamps()));
}
}
/**
* Options supported by {@link TrafficRoutes}.
*
* <p>Inherits standard configuration options.
*/
public interface TrafficRoutesOptions extends ExampleOptions, ExampleBigQueryTableOptions {
@Description("Path of the file to read from")
@Default.String("gs://apache-beam-samples/traffic_sensor/"
+ "Freeways-5Minaa2010-01-01_to_2010-02-15_test2.csv")
String getInputFile();
void setInputFile(String value);
@Description("Numeric value of sliding window duration, in minutes")
@Default.Integer(WINDOW_DURATION)
Integer getWindowDuration();
void setWindowDuration(Integer value);
@Description("Numeric value of window 'slide every' setting, in minutes")
@Default.Integer(WINDOW_SLIDE_EVERY)
Integer getWindowSlideEvery();
void setWindowSlideEvery(Integer value);
}
/**
* Sets up and starts streaming pipeline.
*
* @throws IOException if there is a problem setting up resources
*/
public static void main(String[] args) throws IOException {
TrafficRoutesOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(TrafficRoutesOptions.class);
options.setBigQuerySchema(FormatStatsFn.getSchema());
// Using ExampleUtils to set up required resources.
ExampleUtils exampleUtils = new ExampleUtils(options);
exampleUtils.setup();
Pipeline pipeline = Pipeline.create(options);
TableReference tableRef = new TableReference();
tableRef.setProjectId(options.getProject());
tableRef.setDatasetId(options.getBigQueryDataset());
tableRef.setTableId(options.getBigQueryTable());
pipeline
.apply("ReadLines", new ReadFileAndExtractTimestamps(options.getInputFile()))
// row... => <station route, station speed> ...
.apply(ParDo.of(new ExtractStationSpeedFn()))
// map the incoming data stream into sliding windows.
.apply(Window.<KV<String, StationSpeed>>into(SlidingWindows.of(
Duration.standardMinutes(options.getWindowDuration())).
every(Duration.standardMinutes(options.getWindowSlideEvery()))))
.apply(new TrackSpeed())
.apply(BigQueryIO.writeTableRows().to(tableRef)
.withSchema(FormatStatsFn.getSchema()));
// Run the pipeline.
PipelineResult result = pipeline.run();
// ExampleUtils will try to cancel the pipeline and the injector before the program exists.
exampleUtils.waitToFinish(result);
}
private static Double tryParseAvgSpeed(String[] inputItems) {
try {
return Double.parseDouble(tryParseString(inputItems, 9));
} catch (NumberFormatException e) {
return null;
} catch (NullPointerException e) {
return null;
}
}
private static String tryParseStationType(String[] inputItems) {
return tryParseString(inputItems, 4);
}
private static String tryParseStationId(String[] inputItems) {
return tryParseString(inputItems, 1);
}
private static String tryParseTimestamp(String[] inputItems) {
return tryParseString(inputItems, 0);
}
private static String tryParseString(String[] inputItems, int index) {
return inputItems.length >= index ? inputItems[index] : null;
}
/**
* Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
*/
private static Map<String, String> buildStationInfo() {
Map<String, String> stations = new Hashtable<String, String>();
stations.put("1108413", "SDRoute1"); // from freeway 805 S
stations.put("1108699", "SDRoute2"); // from freeway 78 E
stations.put("1108702", "SDRoute2");
return stations;
}
}