tree: eacbf0d64e559954835cbf8e8072bf5a3454f856 [path history] [tgz]
  1. src/
  2. build.gradle
  3. DISCUSSION.md
  4. README.md
long-ride-alerts/README.md

Lab: ProcessFunction and Timers (Long Ride Alerts)

The goal of the “Long Ride Alerts” exercise is to provide a real-time warning whenever a taxi ride started two hours ago, and is still ongoing.

This should be done using the event time timestamps and watermarks that are provided in the data stream.

The stream is out-of-order, and it is possible that the END event for a ride will be processed before its START event. But in such cases, we never care to create an alert, since we do know that the ride has ended.

Input Data

The input data of this exercise is a DataStream of taxi ride events.

Expected Output

The goal of this exercise is not to find all rides that lasted for more than two hours, but rather to create an alert in real time at the moment it becomes known that a ride has been going on for more than two hours.

The result of the exercise should be a DataStream<TaxiRide> that only contains START events of taxi rides that started two hours earlier, and whose END event hasn't yet arrived.

The resulting stream should be printed to standard out.

Getting Started

:information_source: Rather than following these links to the sources, you might prefer to open these classes in your IDE.

Exercise Classes

Tests

Implementation Hints

This exercise revolves around using a ProcessFunction to manage some keyed state and event time timers, and doing so in a way that works even when the END event for a given rideId arrives before the START (which can happen). The challenge is figuring out what state to keep, and when to set and clear that state. You will want to use event time timers that fire two hours after an incoming START event, and in the onTimer() method, collect START events to the output only if a matching END event hasn't yet arrived.

There are many possible solutions for this exercise, but in general it is enough to keep one TaxiRide in state (one TaxiRide for each key, or rideId). The approach used in the reference solution is to store whichever event arrives first (the START or the END), and if it's a START event, create a timer for two hours later. If and when the other event (for the same rideId) arrives, carefully clean things up.

It is possible to arrange this so that if onTimer() is called, you are guaranteed that an alert (i.e., the ride kept in state) should be emitted. Writing the code this way conveniently puts all of the complex business logic together in one place (in the processElement() method).

Documentation

Reference Solutions

Reference solutions are available at GitHub:


Lab Discussion: ProcessFunction and Timers (Long Ride Alerts)

Back to Labs Overview