tree: 39bb0bf20df2b5cc770dc0f8e6979384dff99268 [path history] [tgz]
  1. src/
  2. build.gradle
  3. README.md
rides-and-fares/README.md

Lab: Stateful Enrichment (Rides and Fares)

The goal of this exercise is to join together the TaxiRide and TaxiFare records for each ride.

For each distinct rideId, there are exactly three events:

  1. a TaxiRide START event
  2. a TaxiRide END event
  3. a TaxiFare event (whose timestamp happens to match the start time)

The result should be a DataStream<Tuple2<TaxiRide, TaxiFare>>, with one record for each distinct rideId. Each tuple should pair the TaxiRide START event for some rideId with its matching TaxiFare.

Input Data

For this exercise you will work with two data streams, one with TaxiRide events generated by a TaxiRideSource and the other with TaxiFare events generated by a TaxiFareSource. See Using the Taxi Data Streams for information on how to download the data and how to work with these stream generators.

Expected Output

The result of this exercise is a data stream of Tuple2<TaxiRide, TaxiFare> records, one for each distinct rideId. The exercise is setup to ignore the END events, and you should join the event for the START of each ride with its corresponding fare event.

The resulting stream is printed to standard out.

Getting Started

:information_source: Rather than following these links to the sources, you might prefer to open these classes in your IDE.

Exercise Classes

Tests

Implementation Hints

You can use a RichCoFlatMap to implement this join operation. Note that you have no control over the order of arrival of the ride and fare records for each rideId, so you'll need to be prepared to store either piece of information until the matching info arrives, at which point you can emit a Tuple2<TaxiRide, TaxiFare> joining the two records together.

You should be using Flink's managed, keyed state to buffer the data that is being held until the matching event arrives. And be sure to clear the state once it is no longer needed.

Discussion

For the purposes of this exercise it‘s okay to assume that the START and fare events are perfectly paired. But in a real-world application you should worry about the fact that whenever an event is missing, the other event for the same rideId will be held in state forever. In a later lab we’ll look at the ProcessFunction and Timers which may also help the situation here.

Documentation

Reference Solutions

Reference solutions are available in this project:


Back to Labs Overview