blob: 9deb366646c55ce2b18a13976ec16e4d22e88fe0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.apex.malhar.stream.sample.complete;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.joda.time.Duration;
import org.apache.apex.malhar.lib.window.TriggerOption;
import org.apache.apex.malhar.lib.window.Tuple;
import org.apache.apex.malhar.lib.window.WindowOption;
import org.apache.apex.malhar.lib.window.accumulation.Group;
import org.apache.apex.malhar.stream.api.ApexStream;
import org.apache.apex.malhar.stream.api.CompositeStreamTransform;
import org.apache.apex.malhar.stream.api.WindowedStream;
import org.apache.apex.malhar.stream.api.function.Function;
import org.apache.apex.malhar.stream.api.impl.StreamFactory;
import org.apache.hadoop.conf.Configuration;
import com.google.common.collect.Lists;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.KeyValPair;
import static org.apache.apex.malhar.stream.api.Option.Options.name;
/**
* Beam's TrafficRoutes example.
*
* @since 3.5.0
*/
@ApplicationAnnotation(name = "TrafficRoutes")
public class TrafficRoutes implements StreamingApplication
{
static Map<String, String> sdStations = buildStationInfo();
static final int WINDOW_DURATION = 3; // Default sliding window duration in minutes
static final int WINDOW_SLIDE_EVERY = 1; // Default window 'slide every' setting in minutes
/**
* This class holds information about a station reading's average speed.
*/
public static class StationSpeed implements Comparable<StationSpeed>
{
@Nullable
String stationId;
@Nullable
Double avgSpeed;
@Nullable
Long timestamp;
public StationSpeed() {}
public StationSpeed(String stationId, Double avgSpeed, Long timestamp)
{
this.stationId = stationId;
this.avgSpeed = avgSpeed;
this.timestamp = timestamp;
}
public void setAvgSpeed(@Nullable Double avgSpeed)
{
this.avgSpeed = avgSpeed;
}
public void setStationId(@Nullable String stationId)
{
this.stationId = stationId;
}
public void setTimestamp(@Nullable Long timestamp)
{
this.timestamp = timestamp;
}
@Nullable
public Long getTimestamp()
{
return timestamp;
}
public String getStationId()
{
return this.stationId;
}
public Double getAvgSpeed()
{
return this.avgSpeed;
}
@Override
public int compareTo(StationSpeed other)
{
return Long.compare(this.timestamp, other.timestamp);
}
}
/**
* This class holds information about a route's speed/slowdown.
*/
static class RouteInfo
{
@Nullable
String route;
@Nullable
Double avgSpeed;
@Nullable
Boolean slowdownEvent;
public RouteInfo()
{
}
public RouteInfo(String route, Double avgSpeed, Boolean slowdownEvent)
{
this.route = route;
this.avgSpeed = avgSpeed;
this.slowdownEvent = slowdownEvent;
}
public String getRoute()
{
return this.route;
}
public Double getAvgSpeed()
{
return this.avgSpeed;
}
public Boolean getSlowdownEvent()
{
return this.slowdownEvent;
}
}
/**
* Extract the timestamp field from the input string, and wrap the input string in a {@link Tuple.TimestampedTuple}
* with the extracted timestamp.
*/
static class ExtractTimestamps implements Function.MapFunction<String, Tuple.TimestampedTuple<String>>
{
@Override
public Tuple.TimestampedTuple<String> f(String input)
{
String[] items = input.split(",");
String timestamp = tryParseTimestamp(items);
return new Tuple.TimestampedTuple<>(Long.parseLong(timestamp), input);
}
}
/**
* Filter out readings for the stations along predefined 'routes', and output
* (station, speed info) keyed on route.
*/
static class ExtractStationSpeedFn implements Function.FlatMapFunction<Tuple.TimestampedTuple<String>, KeyValPair<String, StationSpeed>>
{
@Override
public Iterable<KeyValPair<String, StationSpeed>> f(Tuple.TimestampedTuple<String> input)
{
ArrayList<KeyValPair<String, StationSpeed>> result = new ArrayList<>();
String[] items = input.getValue().split(",");
String stationType = tryParseStationType(items);
// For this analysis, use only 'main line' station types
if (stationType != null && stationType.equals("ML")) {
Double avgSpeed = tryParseAvgSpeed(items);
String stationId = tryParseStationId(items);
// For this simple example, filter out everything but some hardwired routes.
if (avgSpeed != null && stationId != null && sdStations.containsKey(stationId)) {
StationSpeed stationSpeed =
new StationSpeed(stationId, avgSpeed, input.getTimestamp());
// The tuple key is the 'route' name stored in the 'sdStations' hash.
KeyValPair<String, StationSpeed> outputValue = new KeyValPair<>(sdStations.get(stationId), stationSpeed);
result.add(outputValue);
}
}
return result;
}
}
/**
* For a given route, track average speed for the window. Calculate whether
* traffic is currently slowing down, via a predefined threshold. If a supermajority of
* speeds in this sliding window are less than the previous reading we call this a 'slowdown'.
* Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
*/
static class GatherStats
implements Function.FlatMapFunction<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>, Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>>
{
@Override
public Iterable<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> f(Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>> input)
{
ArrayList<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> result = new ArrayList<>();
String route = input.getValue().getKey();
double speedSum = 0.0;
int speedCount = 0;
int speedups = 0;
int slowdowns = 0;
List<StationSpeed> infoList = Lists.newArrayList(input.getValue().getValue());
// StationSpeeds sort by embedded timestamp.
Collections.sort(infoList);
Map<String, Double> prevSpeeds = new HashMap<>();
// For all stations in the route, sum (non-null) speeds. Keep a count of the non-null speeds.
for (StationSpeed item : infoList) {
Double speed = item.getAvgSpeed();
if (speed != null) {
speedSum += speed;
speedCount++;
Double lastSpeed = prevSpeeds.get(item.getStationId());
if (lastSpeed != null) {
if (lastSpeed < speed) {
speedups += 1;
} else {
slowdowns += 1;
}
}
prevSpeeds.put(item.getStationId(), speed);
}
}
if (speedCount == 0) {
// No average to compute.
return result;
}
double speedAvg = speedSum / speedCount;
boolean slowdownEvent = slowdowns >= 2 * speedups;
RouteInfo routeInfo = new RouteInfo(route, speedAvg, slowdownEvent);
result.add(new Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>(input.getTimestamp(), new KeyValPair<String, RouteInfo>(route, routeInfo)));
return result;
}
}
/**
* Output Pojo class for outputting result to JDBC.
*/
static class OutputPojo
{
private Double avgSpeed;
private Boolean slowdownEvent;
private String key;
private Long timestamp;
public OutputPojo()
{
}
public OutputPojo(Double avgSpeed, Boolean slowdownEvent, String key, Long timestamp)
{
this.avgSpeed = avgSpeed;
this.slowdownEvent = slowdownEvent;
this.key = key;
this.timestamp = timestamp;
}
@Override
public String toString()
{
return key + " + " + avgSpeed + " + " + slowdownEvent + " + " + timestamp;
}
public void setTimestamp(Long timestamp)
{
this.timestamp = timestamp;
}
public Long getTimestamp()
{
return timestamp;
}
public void setAvgSpeed(Double avgSpeed)
{
this.avgSpeed = avgSpeed;
}
public Double getAvgSpeed()
{
return avgSpeed;
}
public void setKey(String key)
{
this.key = key;
}
public String getKey()
{
return key;
}
public void setSlowdownEvent(Boolean slowdownEvent)
{
this.slowdownEvent = slowdownEvent;
}
public Boolean getSlowdownEvent()
{
return slowdownEvent;
}
}
public static class Collector extends BaseOperator
{
private static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> result = new HashMap<>();
public static Map<KeyValPair<Long, String>, KeyValPair<Double, Boolean>> getResult()
{
return result;
}
public final transient DefaultInputPort<OutputPojo> input = new DefaultInputPort<OutputPojo>()
{
@Override
public void process(OutputPojo tuple)
{
result.put(new KeyValPair<Long, String>(tuple.getTimestamp(), tuple.getKey()), new KeyValPair<Double, Boolean>(tuple.getAvgSpeed(), tuple.getSlowdownEvent()));
}
};
}
/**
* Format the results of the slowdown calculations to a OutputPojo.
*/
static class FormatStatsFn implements Function.MapFunction<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>, OutputPojo>
{
@Override
public OutputPojo f(Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>> input)
{
RouteInfo routeInfo = input.getValue().getValue();
OutputPojo row = new OutputPojo(routeInfo.getAvgSpeed(), routeInfo.getSlowdownEvent(), input.getValue().getKey(), input.getTimestamp());
return row;
}
}
/**
* This composite transformation extracts speed info from traffic station readings.
* It groups the readings by 'route' and analyzes traffic slowdown for that route.
* Lastly, it formats the results for JDBC.
*/
static class TrackSpeed extends
CompositeStreamTransform<WindowedStream<KeyValPair<String, StationSpeed>>, WindowedStream<OutputPojo>>
{
@Override
public WindowedStream<OutputPojo> compose(WindowedStream<KeyValPair<String, StationSpeed>> inputStream)
{
// Apply a GroupByKey transform to collect a list of all station
// readings for a given route.
WindowedStream<Tuple.WindowedTuple<KeyValPair<String, List<StationSpeed>>>> timeGroup =
inputStream
.accumulateByKey(new Group<StationSpeed>(), new Function.ToKeyValue<KeyValPair<String, StationSpeed>, String, StationSpeed>()
{
@Override
public Tuple<KeyValPair<String, StationSpeed>> f(KeyValPair<String, StationSpeed> input)
{
return new Tuple.TimestampedTuple<>(input.getValue().getTimestamp(), input);
}
}, name("GroupByKey"));
// Analyze 'slowdown' over the route readings.
WindowedStream<Tuple.TimestampedTuple<KeyValPair<String, RouteInfo>>> stats = timeGroup
.flatMap(new GatherStats(), name("GatherStats"));
// Format the results for writing to JDBC table.
WindowedStream<OutputPojo> results = stats.map(new FormatStatsFn(), name("FormatStatsFn"));
return results;
}
}
private static Double tryParseAvgSpeed(String[] inputItems)
{
try {
return Double.parseDouble(tryParseString(inputItems, 3));
} catch (NumberFormatException e) {
return null;
} catch (NullPointerException e) {
return null;
}
}
private static String tryParseStationType(String[] inputItems)
{
return tryParseString(inputItems, 2);
}
private static String tryParseStationId(String[] inputItems)
{
return tryParseString(inputItems, 1);
}
private static String tryParseTimestamp(String[] inputItems)
{
return tryParseString(inputItems, 0);
}
private static String tryParseString(String[] inputItems, int index)
{
return inputItems.length >= index ? inputItems[index] : null;
}
/**
* Define some small hard-wired San Diego 'routes' to track based on sensor station ID.
*/
private static Map<String, String> buildStationInfo()
{
Map<String, String> stations = new Hashtable<String, String>();
stations.put("1108413", "SDRoute1"); // from freeway 805 S
stations.put("1108699", "SDRoute2"); // from freeway 78 E
stations.put("1108702", "SDRoute2");
return stations;
}
/**
* A dummy generator to generate some traffic information.
*/
public static class InfoGen extends BaseOperator implements InputOperator
{
public transient DefaultOutputPort<String> output = new DefaultOutputPort<>();
private String[] stationTypes = new String[]{"ML", "BL", "GL"};
private int[] stationIDs = new int[]{1108413, 1108699, 1108702};
private double ave = 55.0;
private long timestamp;
private static final Duration RAND_RANGE = Duration.standardMinutes(10);
private static int tupleCount = 0;
public static int getTupleCount()
{
return tupleCount;
}
@Override
public void setup(Context.OperatorContext context)
{
tupleCount = 0;
timestamp = System.currentTimeMillis();
}
@Override
public void emitTuples()
{
for (String stationType : stationTypes) {
for (int stationID : stationIDs) {
double speed = Math.random() * 20 + ave;
long time = (long)(Math.random() * RAND_RANGE.getMillis()) + timestamp;
try {
output.emit(time + "," + stationID + "," + stationType + "," + speed);
tupleCount++;
Thread.sleep(50);
} catch (Exception e) {
// Ignore it
}
}
}
}
}
@Override
public void populateDAG(DAG dag, Configuration conf)
{
InfoGen infoGen = new InfoGen();
Collector collector = new Collector();
// Create a stream from the input operator.
ApexStream<Tuple.TimestampedTuple<String>> stream = StreamFactory.fromInput(infoGen, infoGen.output, name("infoGen"))
// Extract the timestamp from the input and wrap it into a TimestampedTuple.
.map(new ExtractTimestamps(), name("ExtractTimestamps"));
stream
// Extract the average speed of a station.
.flatMap(new ExtractStationSpeedFn(), name("ExtractStationSpeedFn"))
// Apply window and trigger option.
.window(new WindowOption.SlidingTimeWindows(Duration.standardMinutes(WINDOW_DURATION), Duration.standardMinutes(WINDOW_SLIDE_EVERY)), new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(5000)).accumulatingFiredPanes())
// Apply TrackSpeed composite transformation to compute the route information.
.addCompositeStreams(new TrackSpeed())
// print the result to console.
.print()
.endWith(collector, collector.input, name("Collector"))
.populateDag(dag);
}
}