ProcessFunction
and Timers (Long Ride Alerts)The goal of the “Long Ride Alerts” exercise is to indicate whenever a taxi ride started two hours ago, and is still ongoing.
The input data of this exercise is a DataStream
of taxi ride events. You will want to use a TaxiRideSource
, as described in the page about the Taxi Data Stream.
You can filter the events to only include rides within New York City (as is done in the Taxi Ride Cleansing exercise), but it is not essential for this lab.
The result of the exercise should be a DataStream<TaxiRide>
that only contains START events of taxi rides which have no matching END event within the first two hours of the ride.
The resulting stream should be printed to standard out.
Here are the rideIds
and start times of the first few rides that go on for more than two hours, but you might want to print other info as well:
> 2758,2013-01-01 00:10:13 > 7575,2013-01-01 00:20:23 > 22131,2013-01-01 00:47:03 > 25473,2013-01-01 00:53:10 > 29907,2013-01-01 01:01:15 > 30796,2013-01-01 01:03:00 ...
:information_source: Rather than following these links to the sources, you might prefer to open these classes in your IDE.
org.apache.flink.training.exercises.longrides.LongRidesExercise
org.apache.flink.training.exercises.longrides.scala.LongRidesExercise
org.apache.flink.training.exercises.longrides.LongRidesTest
org.apache.flink.training.exercises.longrides.scala.LongRidesTest
This exercise revolves around using a ProcessFunction
to manage some keyed state and event time timers, and doing so in a way that works even when the END event for a given rideId
arrives before the START (which will happen). The challenge is figuring out what state to keep, and when to set and clear that state.
You will want to use event time timers that fire two hours after the incoming events, and in the onTimer()
method, collect START events to the output only if a matching END event hasn't yet arrived. As for what state to keep, it is enough to remember the “last” event for each rideId
, where “last” is based on event time and ride type (START vs END — yes, there are rides where the START and END have the same timestamp), rather than the order in which the events are processed. The TaxiRide
class implements Comparable
; feel free to take advantage of that, and be sure to eventually clear any state you create.
Reference solutions are available at GitHub:
org.apache.flink.training.solutions.longrides.LongRidesSolution
org.apache.flink.training.solutions.longrides.scala.LongRidesSolution
Lab Discussion: ProcessFunction
and Timers (Long Ride Alerts)