[FLINK-18482] Replace file-based data sources with generators

This closes #11
diff --git a/README.md b/README.md
index c559fed..f2f80b8 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,6 @@
 1. [Software requirements](#software-requirements)
 1. [Clone and build the flink-training project](#clone-and-build-the-flink-training-project)
 1. [Import the flink-training project into your IDE](#import-the-flink-training-project-into-your-ide)
-1. [Download the data sets](#download-the-data-sets)
 
 [**Using the Taxi Data Streams**](#using-the-taxi-data-streams)
 
@@ -59,7 +58,7 @@
 - an IDE for Java (and/or Scala) development with Gradle support.
   We recommend IntelliJ, but Eclipse or Visual Studio Code can also be used so long as you stick to Java. For Scala you will need to use IntelliJ (and its Scala plugin).
 
-> **:information_source: Note for Windows users:** Many of the examples of shell commands provided in the training instructions are for UNIX systems. To make things easier, you may find it worthwhile to setup cygwin or WSL. For developing Flink jobs, Windows works reasonably well: you can run a Flink cluster on a single machine, submit jobs, run the webUI, and execute jobs in the IDE.
+> **:information_source: Note for Windows users:** The examples of shell commands provided in the training instructions are for UNIX systems. To make things easier, you may find it worthwhile to setup cygwin or WSL. For developing Flink jobs, Windows works reasonably well: you can run a Flink cluster on a single machine, submit jobs, run the webUI, and execute jobs in the IDE.
 
 ### Clone and build the flink-training project
 
@@ -110,24 +109,13 @@
 
 > **:information_source: Note for Scala users:** You will need to use IntelliJ with the JetBrains Scala plugin, and you will need to add a Scala 2.12 SDK to the Global Libraries section of the Project Structure. IntelliJ will ask you for the latter when you open a Scala file.
 
-### Download the data sets
-
-You will also need to download the taxi data files used in this training by running the following commands
-
-```bash
-wget http://training.ververica.com/trainingData/nycTaxiRides.gz
-wget http://training.ververica.com/trainingData/nycTaxiFares.gz
-```
-
-It doesn’t matter if you use `wget` or something else (like `curl`, or Chrome) to download these files, but however you get the data, **do not decompress or rename the `.gz` files**. Some browsers will do the wrong thing by default.
-
-> **:information_source: Note:** There are hardwired paths to these data files in the exercises, in `org.apache.flink.training.exercises.common.utils.ExerciseBase`. The tests don’t use these data files, but before running the exercises, you will need to fix these paths to point to the files you have downloaded.
-
 ## Using the Taxi Data Streams
 
-The [New York City Taxi & Limousine Commission](http://www.nyc.gov/html/tlc/html/home/home.shtml) provides a public [data set](https://uofi.app.box.com/NYCtaxidata) about taxi rides in New York City from 2009 to 2015. We use a modified subset of this data to generate streams of events about taxi rides. You should have downloaded these in the [steps above](#download-the-data-sets).
+These exercises use data generators that produce simulated event streams inspired by those shared by the
+[New York City Taxi & Limousine Commission](http://www.nyc.gov/html/tlc/html/home/home.shtml)
+in their public [data set](https://uofi.app.box.com/NYCtaxidata) about taxi rides in New York City.
 
-### Schema of Taxi Ride Events
+### Schemas of Taxi Ride and Taxi Fare Events
 
 Our taxi data set contains information about individual taxi rides in New York City. Each ride is represented by two events: a trip start, and a trip end event. Each event consists of eleven fields:
 
@@ -136,8 +124,8 @@
 taxiId         : Long      // a unique id for each taxi
 driverId       : Long      // a unique id for each driver
 isStart        : Boolean   // TRUE for ride start events, FALSE for ride end events
-startTime      : DateTime  // the start time of a ride
-endTime        : DateTime  // the end time of a ride,
+startTime      : Instant   // the start time of a ride
+endTime        : Instant   // the end time of a ride,
                            //   "1970-01-01 00:00:00" for start events
 startLon       : Float     // the longitude of the ride start location
 startLat       : Float     // the latitude of the ride start location
@@ -146,80 +134,19 @@
 passengerCnt   : Short     // number of passengers on the ride
 ```
 
-> **:information_source: Note:** The data set contains records with invalid or missing coordinate information (longitude and latitude are 0.0).
-
-There is also a related data set containing taxi ride fare data, with these fields:
+There is also a related data set containing fare data about those same rides, with these fields:
 
 ```
 rideId         : Long      // a unique id for each ride
 taxiId         : Long      // a unique id for each taxi
 driverId       : Long      // a unique id for each driver
-startTime      : DateTime  // the start time of a ride
-paymentType    : String    // CSH or CRD
+startTime      : Instant   // the start time of a ride
+paymentType    : String    // CASH or CARD
 tip            : Float     // tip for this ride
 tolls          : Float     // tolls for this ride
 totalFare      : Float     // total fare collected
 ```
 
-### Generating Taxi Ride Data Streams in a Flink program
-
-> **:information_source: Note: The exercises already provide code for working with these taxi ride data streams.**
-
-We provide a Flink source function (`TaxiRideSource`) that reads a `.gz` file with taxi ride records and emits a stream of `TaxiRide` events. The source operates in [event-time](https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html). There’s an analogous source function (`TaxiFareSource`) for `TaxiFare` events.
-
-In order to generate these streams as realistically as possible, events are emitted proportional to their timestamp. Two events that occurred ten minutes after each other in reality are also served ten minutes after each other. A speed-up factor can be specified to “fast-forward” the stream, i.e., given a speed-up factor of `60`, events that happened within one minute are served in one second. Moreover, one can specify a maximum serving delay which causes each event to be randomly delayed within the specified bound. This yields an out-of-order stream as is common in many real-world applications.
-
-For these exercises, a speed-up factor of `600` or more (i.e., 10 minutes of event time for every second of processing), and a maximum delay of `60` (seconds) will work well.
-
-All exercises should be implemented using event-time characteristics. Event-time decouples the program semantics from serving speed and guarantees consistent results even in case of historic data or data which is delivered out-of-order.
-
-#### How to use these sources
-
-**Java**
-
-```java
-// get an ExecutionEnvironment
-StreamExecutionEnvironment env =
-  StreamExecutionEnvironment.getExecutionEnvironment();
-// configure event-time processing
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-
-// get the taxi ride data stream
-DataStream<TaxiRide> rides = env.addSource(
-  new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed));
-```
-
-**Scala**
-
-```scala
-// get an ExecutionEnvironment
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-// configure event-time processing
-env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
-// get the taxi ride data stream
-val rides = env.addSource(
-  new TaxiRideSource("/path/to/nycTaxiRides.gz", maxDelay, servingSpeed))
-```
-
-There is also a `TaxiFareSource` that works in an analogous fashion, using the `nycTaxiFares.gz` file. This source creates a stream of `TaxiFare` events.
-
-**Java**
-
-```java
-// get the taxi fare data stream
-DataStream<TaxiFare> fares = env.addSource(
-  new TaxiFareSource("/path/to/nycTaxiFares.gz", maxDelay, servingSpeed));
-```
-
-**Scala**
-
-```scala
-// get the taxi fare data stream
-val fares = env.addSource(
-  new TaxiFareSource("/path/to/nycTaxiFares.gz", maxDelay, servingSpeed))
-```
-
 ## How to do the Labs
 
 In the hands-on sessions you will implement Flink programs using various Flink APIs.
@@ -232,17 +159,6 @@
 
 The initial set of exercises are all based on data streams of events about taxi rides and taxi fares. These streams are produced by source functions which reads data from input files. Please read the [instructions above](#using-the-taxi-data-streams) to learn how to use them.
 
-### Modify `ExerciseBase`
-
-After downloading the datasets, open the `org.apache.flink.training.exercises.common.utils.ExerciseBase` class in your IDE, and edit these two lines to point to the two taxi ride data files you have downloaded:
-
-```java
-public final static String PATH_TO_RIDE_DATA =
-    "/Users/david/stuff/flink-training/trainingData/nycTaxiRides.gz";
-public final static String PATH_TO_FARE_DATA =
-    "/Users/david/stuff/flink-training/trainingData/nycTaxiFares.gz";
-```
-
 ### Run and debug Flink programs in your IDE
 
 Flink programs can be executed and debugged from within an IDE. This significantly eases the development process and provides an experience similar to working on any other Java (or Scala) application.
@@ -256,10 +172,12 @@
 
 ### Exercises, Tests, and Solutions
 
-Many of the exercises include an `...Exercise` class with most of the necessary boilerplate code for getting started, as well as a JUnit Test class (`...Test`) with a few tests for your implementation, and a `...Solution` class with a complete solution.
+Each of these exercises includes an `...Exercise` class with most of the necessary boilerplate code for getting started, as well as a JUnit Test class (`...Test`) with a few tests for your implementation, and a `...Solution` class with a complete solution.
 
 > **:information_source: Note:** As long as your `...Exercise` class is throwing a `MissingSolutionException`, the provided JUnit test classes will ignore that failure and verify the correctness of the solution implementation instead.
 
+There are Java and Scala versions of all the exercise, test, and solution classes.
+
 -----
 
 Now you are ready to begin with the first exercise in our [**Labs**](LABS-OVERVIEW.md).
diff --git a/build.gradle b/build.gradle
index ced3f7f..300433c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -86,8 +86,6 @@
         implementation "log4j:log4j:${log4jVersion}"
         implementation "org.slf4j:slf4j-log4j12:${slf4jVersion}"
 
-        flinkShadowJar "joda-time:joda-time:2.7"
-
         if (project != project(":common")) {
             implementation project(path: ':common')
             // transitive dependencies for flinkShadowJar need to be defined above
diff --git a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
index 35ed7f4..74e56fe 100644
--- a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
+++ b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
@@ -20,20 +20,15 @@
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource;
-import org.apache.flink.training.exercises.common.utils.ExerciseBase;
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
 
 /**
  * Example that counts the rides for each driver.
  *
- * <p>Parameters:
- *   -input path-to-input-file
- *
  * <p>Note that this is implicitly keeping state for each driver.
  * This sort of simple, non-windowed aggregation on an unbounded set of keys will use an unbounded amount of state.
  * When this is an issue, look at the SQL/Table API, or ProcessFunction, or state TTL, all of which provide
@@ -44,24 +39,15 @@
 	/**
 	 * Main method.
 	 *
-	 * <p>Parameters:
-	 *   -input path-to-input-file
-	 *
 	 * @throws Exception which occurs during job execution.
 	 */
 	public static void main(String[] args) throws Exception {
 
-		ParameterTool params = ParameterTool.fromArgs(args);
-		final String input = params.get("input", ExerciseBase.PATH_TO_RIDE_DATA);
-
-		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
-		final int servingSpeedFactor = 600; // events of 10 minutes are served every second
-
 		// set up streaming execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 
 		// start the data generator
-		DataStream<TaxiRide> rides = env.addSource(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor));
+		DataStream<TaxiRide> rides = env.addSource(new TaxiRideGenerator());
 
 		// map each ride to a tuple of (driverId, 1)
 		DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
index f1da3e6..013f2d3 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
@@ -18,36 +18,55 @@
 
 package org.apache.flink.training.exercises.common.datatypes;
 
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
+import org.apache.flink.training.exercises.common.utils.DataGenerator;
 
 import java.io.Serializable;
-import java.util.Locale;
+import java.time.Instant;
 
 /**
- * A TaxiFare is a taxi fare event.
+ * A TaxiFare has payment information about a taxi ride.
  *
- * <p>A TaxiFare consists of
- * - the rideId of the event
- * - the time of the event
+ * <p>It has these fields in common with the TaxiRides
+ * - the rideId
+ * - the taxiId
+ * - the driverId
+ * - the startTime
+ *
+ * <p>It also includes
+ * - the paymentType
+ * - the tip
+ * - the tolls
+ * - the totalFare
  */
 public class TaxiFare implements Serializable {
 
-	private static final DateTimeFormatter TIME_FORMATTER =
-			DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
-
 	/**
-	 * Creates a TaxiFare with now as start time.
+	 * Creates a TaxiFare with now as the start time.
 	 */
 	public TaxiFare() {
-		this.startTime = new DateTime();
+		this.startTime = Instant.now();
+	}
+
+	/**
+	 * Invents a TaxiFare.
+	 */
+	public TaxiFare(long rideId) {
+		DataGenerator g = new DataGenerator(rideId);
+
+		this.rideId = rideId;
+		this.taxiId = g.taxiId();
+		this.driverId = g.driverId();
+		this.startTime = g.startTime();
+		this.paymentType = g.paymentType();
+		this.tip = g.tip();
+		this.tolls = g.tolls();
+		this.totalFare = g.totalFare();
 	}
 
 	/**
 	 * Creates a TaxiFare with the given parameters.
 	 */
-	public TaxiFare(long rideId, long taxiId, long driverId, DateTime startTime, String paymentType, float tip, float tolls, float totalFare) {
+	public TaxiFare(long rideId, long taxiId, long driverId, Instant startTime, String paymentType, float tip, float tolls, float totalFare) {
 		this.rideId = rideId;
 		this.taxiId = taxiId;
 		this.driverId = driverId;
@@ -61,7 +80,7 @@
 	public long rideId;
 	public long taxiId;
 	public long driverId;
-	public DateTime startTime;
+	public Instant startTime;
 	public String paymentType;
 	public float tip;
 	public float tolls;
@@ -69,45 +88,17 @@
 
 	@Override
 	public String toString() {
+
 		return rideId + "," +
 				taxiId + "," +
 				driverId + "," +
-				startTime.toString(TIME_FORMATTER) + "," +
+				startTime.toString() + "," +
 				paymentType + "," +
 				tip + "," +
 				tolls + "," +
 				totalFare;
 	}
 
-	/**
-	 * Parse a TaxiFare from a CSV representation.
-	 */
-	public static TaxiFare fromString(String line) {
-
-		String[] tokens = line.split(",");
-		if (tokens.length != 8) {
-			throw new RuntimeException("Invalid record: " + line);
-		}
-
-		TaxiFare ride = new TaxiFare();
-
-		try {
-			ride.rideId = Long.parseLong(tokens[0]);
-			ride.taxiId = Long.parseLong(tokens[1]);
-			ride.driverId = Long.parseLong(tokens[2]);
-			ride.startTime = DateTime.parse(tokens[3], TIME_FORMATTER);
-			ride.paymentType = tokens[4];
-			ride.tip = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
-			ride.tolls = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
-			ride.totalFare = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
-
-		} catch (NumberFormatException nfe) {
-			throw new RuntimeException("Invalid record: " + line, nfe);
-		}
-
-		return ride;
-	}
-
 	@Override
 	public boolean equals(Object other) {
 		return other instanceof TaxiFare &&
@@ -123,6 +114,7 @@
 	 * Gets the fare's start time.
 	 */
 	public long getEventTime() {
-		return startTime.getMillis();
+		return startTime.toEpochMilli();
 	}
+
 }
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
index a923a00..3bec5e7 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
@@ -18,16 +18,13 @@
 
 package org.apache.flink.training.exercises.common.datatypes;
 
+import org.apache.flink.training.exercises.common.utils.DataGenerator;
 import org.apache.flink.training.exercises.common.utils.GeoUtils;
 
-import org.joda.time.DateTime;
-import org.joda.time.format.DateTimeFormat;
-import org.joda.time.format.DateTimeFormatter;
-
 import javax.annotation.Nullable;
 
 import java.io.Serializable;
-import java.util.Locale;
+import java.time.Instant;
 
 /**
  * A TaxiRide is a taxi ride event. There are two types of events, a taxi ride start event and a
@@ -47,23 +44,39 @@
  */
 public class TaxiRide implements Comparable<TaxiRide>, Serializable {
 
-	private static final DateTimeFormatter TIME_FORMATTER =
-			DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
-
 	/**
 	 * Creates a new TaxiRide with now as start and end time.
 	 */
 	public TaxiRide() {
-		this.startTime = new DateTime();
-		this.endTime = new DateTime();
+		this.startTime = Instant.now();
+		this.endTime = Instant.now();
+	}
+
+	/**
+	 * Invents a TaxiRide.
+	 */
+	public TaxiRide(long rideId, boolean isStart) {
+		DataGenerator g = new DataGenerator(rideId);
+
+		this.rideId = rideId;
+		this.isStart = isStart;
+		this.startTime = g.startTime();
+		this.endTime = isStart ? Instant.ofEpochMilli(0) : g.endTime();
+		this.startLon = g.startLon();
+		this.startLat = g.startLat();
+		this.endLon = g.endLon();
+		this.endLat = g.endLat();
+		this.passengerCnt = g.passengerCnt();
+		this.taxiId = g.taxiId();
+		this.driverId = g.driverId();
 	}
 
 	/**
 	 * Creates a TaxiRide with the given parameters.
 	 */
-	public TaxiRide(long rideId, boolean isStart, DateTime startTime, DateTime endTime,
-					float startLon, float startLat, float endLon, float endLat,
-					short passengerCnt, long taxiId, long driverId) {
+	public TaxiRide(long rideId, boolean isStart, Instant startTime, Instant endTime,
+			float startLon, float startLat, float endLon, float endLat,
+			short passengerCnt, long taxiId, long driverId) {
 		this.rideId = rideId;
 		this.isStart = isStart;
 		this.startTime = startTime;
@@ -79,8 +92,8 @@
 
 	public long rideId;
 	public boolean isStart;
-	public DateTime startTime;
-	public DateTime endTime;
+	public Instant startTime;
+	public Instant endTime;
 	public float startLon;
 	public float startLat;
 	public float endLon;
@@ -91,10 +104,11 @@
 
 	@Override
 	public String toString() {
+
 		return rideId + "," +
 				(isStart ? "START" : "END") + "," +
-				startTime.toString(TIME_FORMATTER) + "," +
-				endTime.toString(TIME_FORMATTER) + "," +
+				startTime.toString() + "," +
+				endTime.toString() + "," +
 				startLon + "," +
 				startLat + "," +
 				endLon + "," +
@@ -105,51 +119,6 @@
 	}
 
 	/**
-	 * Parse a TaxiRide from a CSV representation.
-	 */
-	public static TaxiRide fromString(String line) {
-
-		String[] tokens = line.split(",");
-		if (tokens.length != 11) {
-			throw new RuntimeException("Invalid record: " + line);
-		}
-
-		TaxiRide ride = new TaxiRide();
-
-		try {
-			ride.rideId = Long.parseLong(tokens[0]);
-
-			switch (tokens[1]) {
-				case "START":
-					ride.isStart = true;
-					ride.startTime = DateTime.parse(tokens[2], TIME_FORMATTER);
-					ride.endTime = DateTime.parse(tokens[3], TIME_FORMATTER);
-					break;
-				case "END":
-					ride.isStart = false;
-					ride.endTime = DateTime.parse(tokens[2], TIME_FORMATTER);
-					ride.startTime = DateTime.parse(tokens[3], TIME_FORMATTER);
-					break;
-				default:
-					throw new RuntimeException("Invalid record: " + line);
-			}
-
-			ride.startLon = tokens[4].length() > 0 ? Float.parseFloat(tokens[4]) : 0.0f;
-			ride.startLat = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
-			ride.endLon = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
-			ride.endLat = tokens[7].length() > 0 ? Float.parseFloat(tokens[7]) : 0.0f;
-			ride.passengerCnt = Short.parseShort(tokens[8]);
-			ride.taxiId = Long.parseLong(tokens[9]);
-			ride.driverId = Long.parseLong(tokens[10]);
-
-		} catch (NumberFormatException nfe) {
-			throw new RuntimeException("Invalid record: " + line, nfe);
-		}
-
-		return ride;
-	}
-
-	/**
 	 * Compares this TaxiRide with the given one.
 	 *
 	 * <ul>
@@ -196,10 +165,10 @@
 	 */
 	public long getEventTime() {
 		if (isStart) {
-			return startTime.getMillis();
+			return startTime.toEpochMilli();
 		}
 		else {
-			return endTime.getMillis();
+			return endTime.toEpochMilli();
 		}
 	}
 
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
new file mode 100644
index 0000000..c3ca076
--- /dev/null
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareGenerator.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.common.sources;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
+
+/**
+ * This SourceFunction generates a data stream of TaxiFare records that include event time
+ * timestamps.
+ *
+ * <p>The stream is generated in order, and it includes Watermarks.
+ *
+ */
+public class TaxiFareGenerator implements SourceFunction<TaxiFare> {
+
+	private volatile boolean running = true;
+
+	@Override
+	public void run(SourceContext<TaxiFare> ctx) throws Exception {
+
+		long id = 1;
+
+		while (running) {
+			TaxiFare fare = new TaxiFare(id);
+			id += 1;
+
+			ctx.collectWithTimestamp(fare, fare.getEventTime());
+			ctx.emitWatermark(new Watermark(fare.getEventTime()));
+
+			// match our event production rate to that of the TaxiRideGenerator
+			Thread.sleep(TaxiRideGenerator.SLEEP_MILLIS_PER_EVENT);
+		}
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+}
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
deleted file mode 100644
index a02eff6..0000000
--- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.training.exercises.common.sources;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.Calendar;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-import java.util.Random;
-import java.util.zip.GZIPInputStream;
-
-/**
- * This SourceFunction generates a data stream of TaxiFare records which are
- * read from a gzipped input file. Each record has a time stamp and the input file must be
- * ordered by this time stamp.
- *
- * <p>In order to simulate a realistic stream source, the SourceFunction serves events proportional to
- * their timestamps. In addition, the serving of events can be delayed by a bounded random delay
- * which causes the events to be served slightly out-of-order of their timestamps.
- *
- * <p>The serving speed of the SourceFunction can be adjusted by a serving speed factor.
- * A factor of 60.0 increases the logical serving time by a factor of 60, i.e., events of one
- * minute (60 seconds) are served in 1 second.
- *
- * <p>This SourceFunction is an EventSourceFunction and does continuously emit watermarks.
- * Hence it is able to operate in event time mode which is configured as follows:
- *
- * <code>
- *     StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- * </code>
- */
-public class TaxiFareSource implements SourceFunction<TaxiFare> {
-
-	private final int maxDelayMsecs;
-	private final int watermarkDelayMSecs;
-
-	private final String dataFilePath;
-	private final int servingSpeed;
-
-	private transient BufferedReader reader;
-	private transient InputStream gzipStream;
-
-	/**
-	 * Serves the TaxiFare records from the specified and ordered gzipped input file.
-	 * Rides are served exactly in order of their time stamps
-	 * at the speed at which they were originally generated.
-	 *
-	 * @param dataFilePath The gzipped input file from which the TaxiFare records are read.
-	 */
-	public TaxiFareSource(String dataFilePath) {
-		this(dataFilePath, 0, 1);
-	}
-
-	/**
-	 * Serves the TaxiFare records from the specified and ordered gzipped input file.
-	 * Rides are served exactly in order of their time stamps
-	 * in a serving speed which is proportional to the specified serving speed factor.
-	 *
-	 * @param dataFilePath The gzipped input file from which the TaxiFare records are read.
-	 * @param servingSpeedFactor The serving speed factor by which the logical serving time is adjusted.
-	 */
-	public TaxiFareSource(String dataFilePath, int servingSpeedFactor) {
-		this(dataFilePath, 0, servingSpeedFactor);
-	}
-
-	/**
-	 * Serves the TaxiFare records from the specified and ordered gzipped input file.
-	 * Rides are served out-of time stamp order with specified maximum random delay
-	 * in a serving speed which is proportional to the specified serving speed factor.
-	 *
-	 * @param dataFilePath The gzipped input file from which the TaxiFare records are read.
-	 * @param maxEventDelaySecs The max time in seconds by which events are delayed.
-	 * @param servingSpeedFactor The serving speed factor by which the logical serving time is adjusted.
-	 */
-	public TaxiFareSource(String dataFilePath, int maxEventDelaySecs, int servingSpeedFactor) {
-		if (maxEventDelaySecs < 0) {
-			throw new IllegalArgumentException("Max event delay must be positive");
-		}
-		this.dataFilePath = dataFilePath;
-		this.maxDelayMsecs = maxEventDelaySecs * 1000;
-		this.watermarkDelayMSecs = Math.max(maxDelayMsecs, 10000);
-		this.servingSpeed = servingSpeedFactor;
-	}
-
-	@Override
-	public void run(SourceContext<TaxiFare> sourceContext) throws Exception {
-
-		gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
-		reader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8));
-
-		generateUnorderedStream(sourceContext);
-
-		this.reader.close();
-		this.reader = null;
-		this.gzipStream.close();
-		this.gzipStream = null;
-
-	}
-
-	private void generateUnorderedStream(SourceContext<TaxiFare> sourceContext) throws Exception {
-
-		long servingStartTime = Calendar.getInstance().getTimeInMillis();
-		long dataStartTime;
-
-		Random rand = new Random(7452);
-		PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
-				32,
-				Comparator.comparing(o -> o.f0));
-
-		// read first ride and insert it into emit schedule
-		String line;
-		TaxiFare fare;
-		if (reader.ready() && (line = reader.readLine()) != null) {
-			// read first ride
-			fare = TaxiFare.fromString(line);
-			// extract starting timestamp
-			dataStartTime = getEventTime(fare);
-			// get delayed time
-			long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);
-
-			emitSchedule.add(Tuple2.of(delayedEventTime, fare));
-			// schedule next watermark
-			long watermarkTime = dataStartTime + watermarkDelayMSecs;
-			Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
-			emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
-
-		} else {
-			return;
-		}
-
-		// peek at next ride
-		if (reader.ready() && (line = reader.readLine()) != null) {
-			fare = TaxiFare.fromString(line);
-		}
-
-		// read rides one-by-one and emit a random ride from the buffer each time
-		while (emitSchedule.size() > 0 || reader.ready()) {
-
-			// insert all events into schedule that might be emitted next
-			long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1;
-			long rideEventTime = fare != null ? getEventTime(fare) : -1;
-			while (
-					fare != null && (// while there is a ride AND
-						emitSchedule.isEmpty() || // and no ride in schedule OR
-						rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule
-					) {
-				// insert event into emit schedule
-				long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
-				emitSchedule.add(Tuple2.of(delayedEventTime, fare));
-
-				// read next ride
-				if (reader.ready() && (line = reader.readLine()) != null) {
-					fare = TaxiFare.fromString(line);
-					rideEventTime = getEventTime(fare);
-				}
-				else {
-					fare = null;
-					rideEventTime = -1;
-				}
-			}
-
-			// emit schedule is updated, emit next element in schedule
-			Tuple2<Long, Object> head = emitSchedule.remove();
-			long delayedEventTime = head.f0;
-
-			long now = Calendar.getInstance().getTimeInMillis();
-			long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
-			long waitTime = servingTime - now;
-
-			//noinspection BusyWait
-			Thread.sleep((waitTime > 0) ? waitTime : 0);
-
-			if (head.f1 instanceof TaxiFare) {
-				TaxiFare emitFare = (TaxiFare) head.f1;
-				// emit ride
-				sourceContext.collectWithTimestamp(emitFare, getEventTime(emitFare));
-			}
-			else if (head.f1 instanceof Watermark) {
-				Watermark emitWatermark = (Watermark) head.f1;
-				// emit watermark
-				sourceContext.emitWatermark(emitWatermark);
-				// schedule next watermark
-				long watermarkTime = delayedEventTime + watermarkDelayMSecs;
-				Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
-				emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
-			}
-		}
-	}
-
-	protected long toServingTime(long servingStartTime, long dataStartTime, long eventTime) {
-		long dataDiff = eventTime - dataStartTime;
-		return servingStartTime + (dataDiff / this.servingSpeed);
-	}
-
-	protected long getEventTime(TaxiFare fare) {
-		return fare.getEventTime();
-	}
-
-	protected long getNormalDelayMsecs(Random rand) {
-		long delay = -1;
-		long x = maxDelayMsecs / 2;
-		while (delay < 0 || delay > maxDelayMsecs) {
-			delay = (long) (rand.nextGaussian() * x) + x;
-		}
-		return delay;
-	}
-
-	@Override
-	public void cancel() {
-		try {
-			if (this.reader != null) {
-				this.reader.close();
-			}
-			if (this.gzipStream != null) {
-				this.gzipStream.close();
-			}
-		} catch (IOException ioe) {
-			throw new RuntimeException("Could not cancel SourceFunction", ioe);
-		} finally {
-			this.reader = null;
-			this.gzipStream = null;
-		}
-	}
-
-}
-
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
new file mode 100644
index 0000000..8ee5d01
--- /dev/null
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideGenerator.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.common.sources;
+
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Random;
+
+/**
+ * This SourceFunction generates a data stream of TaxiRide records that include event time
+ * timestamps.
+ *
+ * <p>The stream is produced out-of-order, and includes Watermarks (with no late events).
+ *
+ */
+public class TaxiRideGenerator implements SourceFunction<TaxiRide> {
+
+	public static final int SLEEP_MILLIS_PER_EVENT = 10;
+	private static final int BATCH_SIZE = 5;
+	private volatile boolean running = true;
+
+	@Override
+	public void run(SourceContext<TaxiRide> ctx) throws Exception {
+
+		PriorityQueue<TaxiRide> endEventQ = new PriorityQueue<>(100);
+		long id = 0;
+		long maxStartTime = 0;
+
+		while (running) {
+
+			// generate a batch of START events
+			List<TaxiRide> startEvents = new ArrayList<TaxiRide>(BATCH_SIZE);
+			for (int i = 1; i <= BATCH_SIZE; i++) {
+				TaxiRide ride = new TaxiRide(id + i, true);
+				startEvents.add(ride);
+				// the start times may be in order, but let's not assume that
+				maxStartTime = Math.max(maxStartTime, ride.startTime.toEpochMilli());
+			}
+
+			// enqueue the corresponding END events
+			for (int i = 1; i <= BATCH_SIZE; i++) {
+				endEventQ.add(new TaxiRide(id + i, false));
+			}
+
+			// release the END events coming before the end of this new batch
+			// (this allows a few END events to precede their matching START event)
+			while (endEventQ.peek().getEventTime() <= maxStartTime) {
+				TaxiRide ride = endEventQ.poll();
+				ctx.collectWithTimestamp(ride, ride.getEventTime());
+			}
+
+			// then emit the new START events (out-of-order)
+			java.util.Collections.shuffle(startEvents, new Random(id));
+			startEvents.iterator().forEachRemaining(r -> ctx.collectWithTimestamp(r, r.getEventTime()));
+
+			// produce a Watermark
+			ctx.emitWatermark(new Watermark(maxStartTime));
+
+			// prepare for the next batch
+			id += BATCH_SIZE;
+
+			// don't go too fast
+			Thread.sleep(BATCH_SIZE * SLEEP_MILLIS_PER_EVENT);
+		}
+	}
+
+	@Override
+	public void cancel() {
+		running = false;
+	}
+}
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
deleted file mode 100644
index b19a310..0000000
--- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.training.exercises.common.sources;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-
-import java.io.BufferedReader;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.nio.charset.StandardCharsets;
-import java.util.Calendar;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-import java.util.Random;
-import java.util.zip.GZIPInputStream;
-
-/**
- * This SourceFunction generates a data stream of TaxiRide records which are
- * read from a gzipped input file. Each record has a time stamp and the input file must be
- * ordered by this time stamp.
- *
- * <p>In order to simulate a realistic stream source, the SourceFunction serves events proportional to
- * their timestamps. In addition, the serving of events can be delayed by a bounded random delay
- * which causes the events to be served slightly out-of-order of their timestamps.
- *
- * <p>The serving speed of the SourceFunction can be adjusted by a serving speed factor.
- * A factor of 60.0 increases the logical serving time by a factor of 60, i.e., events of one
- * minute (60 seconds) are served in 1 second.
- *
- * <p>This SourceFunction is an EventSourceFunction and does continuously emit watermarks.
- * Hence it is able to operate in event time mode which is configured as follows:
- *
- * <code>
- *     StreamExecutionEnvironment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
- * </code>
- */
-public class TaxiRideSource implements SourceFunction<TaxiRide> {
-
-	private final int maxDelayMsecs;
-	private final int watermarkDelayMSecs;
-
-	private final String dataFilePath;
-	private final int servingSpeed;
-
-	private transient BufferedReader reader;
-	private transient InputStream gzipStream;
-
-	/**
-	 * Serves the TaxiRide records from the specified and ordered gzipped input file.
-	 * Rides are served exactly in order of their time stamps
-	 * at the speed at which they were originally generated.
-	 *
-	 * @param dataFilePath The gzipped input file from which the TaxiRide records are read.
-	 */
-	public TaxiRideSource(String dataFilePath) {
-		this(dataFilePath, 0, 1);
-	}
-
-	/**
-	 * Serves the TaxiRide records from the specified and ordered gzipped input file.
-	 * Rides are served exactly in order of their time stamps
-	 * in a serving speed which is proportional to the specified serving speed factor.
-	 *
-	 * @param dataFilePath The gzipped input file from which the TaxiRide records are read.
-	 * @param servingSpeedFactor The serving speed factor by which the logical serving time is adjusted.
-	 */
-	public TaxiRideSource(String dataFilePath, int servingSpeedFactor) {
-		this(dataFilePath, 0, servingSpeedFactor);
-	}
-
-	/**
-	 * Serves the TaxiRide records from the specified and ordered gzipped input file.
-	 * Rides are served out-of time stamp order with specified maximum random delay
-	 * in a serving speed which is proportional to the specified serving speed factor.
-	 *
-	 * @param dataFilePath The gzipped input file from which the TaxiRide records are read.
-	 * @param maxEventDelaySecs The max time in seconds by which events are delayed.
-	 * @param servingSpeedFactor The serving speed factor by which the logical serving time is adjusted.
-	 */
-	public TaxiRideSource(String dataFilePath, int maxEventDelaySecs, int servingSpeedFactor) {
-		if (maxEventDelaySecs < 0) {
-			throw new IllegalArgumentException("Max event delay must be positive");
-		}
-		this.dataFilePath = dataFilePath;
-		this.maxDelayMsecs = maxEventDelaySecs * 1000;
-		this.watermarkDelayMSecs = Math.max(maxDelayMsecs, 10000);
-		this.servingSpeed = servingSpeedFactor;
-	}
-
-	@Override
-	public void run(SourceContext<TaxiRide> sourceContext) throws Exception {
-
-		gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
-		reader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8));
-
-		generateUnorderedStream(sourceContext);
-
-		this.reader.close();
-		this.reader = null;
-		this.gzipStream.close();
-		this.gzipStream = null;
-
-	}
-
-	private void generateUnorderedStream(SourceContext<TaxiRide> sourceContext) throws Exception {
-
-		long servingStartTime = Calendar.getInstance().getTimeInMillis();
-		long dataStartTime;
-
-		Random rand = new Random(7452);
-		PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
-				32,
-				Comparator.comparing(o -> o.f0));
-
-		// read first ride and insert it into emit schedule
-		String line;
-		TaxiRide ride;
-		if (reader.ready() && (line = reader.readLine()) != null) {
-			// read first ride
-			ride = TaxiRide.fromString(line);
-			// extract starting timestamp
-			dataStartTime = getEventTime(ride);
-			// get delayed time
-			long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);
-
-			emitSchedule.add(Tuple2.of(delayedEventTime, ride));
-			// schedule next watermark
-			long watermarkTime = dataStartTime + watermarkDelayMSecs;
-			Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
-			emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
-
-		} else {
-			return;
-		}
-
-		// peek at next ride
-		if (reader.ready() && (line = reader.readLine()) != null) {
-			ride = TaxiRide.fromString(line);
-		}
-
-		// read rides one-by-one and emit a random ride from the buffer each time
-		while (emitSchedule.size() > 0 || reader.ready()) {
-
-			// insert all events into schedule that might be emitted next
-			long curNextDelayedEventTime = !emitSchedule.isEmpty() ? emitSchedule.peek().f0 : -1;
-			long rideEventTime = ride != null ? getEventTime(ride) : -1;
-			while (
-					ride != null && (// while there is a ride AND
-						emitSchedule.isEmpty() || // and no ride in schedule OR
-						rideEventTime < curNextDelayedEventTime + maxDelayMsecs) // not enough rides in schedule
-					) {
-				// insert event into emit schedule
-				long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
-				emitSchedule.add(Tuple2.of(delayedEventTime, ride));
-
-				// read next ride
-				if (reader.ready() && (line = reader.readLine()) != null) {
-					ride = TaxiRide.fromString(line);
-					rideEventTime = getEventTime(ride);
-				}
-				else {
-					ride = null;
-					rideEventTime = -1;
-				}
-			}
-
-			// emit schedule is updated, emit next element in schedule
-			Tuple2<Long, Object> head = emitSchedule.remove();
-			long delayedEventTime = head.f0;
-
-			long now = Calendar.getInstance().getTimeInMillis();
-			long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
-			long waitTime = servingTime - now;
-
-			//noinspection BusyWait
-			Thread.sleep((waitTime > 0) ? waitTime : 0);
-
-			if (head.f1 instanceof TaxiRide) {
-				TaxiRide emitRide = (TaxiRide) head.f1;
-				// emit ride
-				sourceContext.collectWithTimestamp(emitRide, getEventTime(emitRide));
-			}
-			else if (head.f1 instanceof Watermark) {
-				Watermark emitWatermark = (Watermark) head.f1;
-				// emit watermark
-				sourceContext.emitWatermark(emitWatermark);
-				// schedule next watermark
-				long watermarkTime = delayedEventTime + watermarkDelayMSecs;
-				Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
-				emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
-			}
-		}
-	}
-
-	protected long toServingTime(long servingStartTime, long dataStartTime, long eventTime) {
-		long dataDiff = eventTime - dataStartTime;
-		return servingStartTime + (dataDiff / this.servingSpeed);
-	}
-
-	protected long getEventTime(TaxiRide ride) {
-		return ride.getEventTime();
-	}
-
-	protected long getNormalDelayMsecs(Random rand) {
-		long delay = -1;
-		long x = maxDelayMsecs / 2;
-		while (delay < 0 || delay > maxDelayMsecs) {
-			delay = (long) (rand.nextGaussian() * x) + x;
-		}
-		return delay;
-	}
-
-	@Override
-	public void cancel() {
-		try {
-			if (this.reader != null) {
-				this.reader.close();
-			}
-			if (this.gzipStream != null) {
-				this.gzipStream.close();
-			}
-		} catch (IOException ioe) {
-			throw new RuntimeException("Could not cancel SourceFunction", ioe);
-		} finally {
-			this.reader = null;
-			this.gzipStream = null;
-		}
-	}
-
-}
-
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
new file mode 100644
index 0000000..f40c3c4
--- /dev/null
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/DataGenerator.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.training.exercises.common.utils;
+
+import java.time.Instant;
+import java.util.Random;
+
+/**
+ * Data generator for the fields in TaxiRide and TaxiFare objects.
+ *
+ * <p>Results are deterministically determined by the rideId. This guarantees (among other things)
+ * that the startTime for a TaxiRide START event matches the startTime for the TaxiRide END and
+ * TaxiFare events for that same rideId.
+ */
+public class DataGenerator {
+
+	private static final int SECONDS_BETWEEN_RIDES = 20;
+	private static final int NUMBER_OF_DRIVERS = 200;
+	private static final Instant beginTime = Instant.parse("2020-01-01T12:00:00.00Z");
+
+	private transient long rideId;
+
+	/**
+	 * Creates a DataGenerator for the specified rideId.
+	 */
+	public DataGenerator(long rideId) {
+		this.rideId = rideId;
+	}
+
+	/**
+	 * Deterministically generates and returns the startTime for this ride.
+	 */
+	public Instant startTime() {
+		return beginTime.plusSeconds(SECONDS_BETWEEN_RIDES * rideId);
+	}
+
+	/**
+	 * Deterministically generates and returns the endTime for this ride.
+	 */
+	public Instant endTime() {
+		return startTime().plusSeconds(60 * rideDurationMinutes());
+	}
+
+	/**
+	 * Deterministically generates and returns the driverId for this ride.
+	 * The HourlyTips exercise is more interesting if aren't too many drivers.
+	 */
+	public long driverId() {
+		Random rnd = new Random(rideId);
+		return 2013000000 + rnd.nextInt(NUMBER_OF_DRIVERS);
+	}
+
+	/**
+	 * Deterministically generates and returns the taxiId for this ride.
+	 */
+	public long taxiId() {
+		return driverId();
+	}
+
+	/**
+	 * Deterministically generates and returns the startLat for this ride.
+	 *
+	 * <p>The locations are used in the RideCleansing exercise.
+	 * We want some rides to be outside of NYC.
+	 */
+	public float startLat() {
+		return aFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) (GeoUtils.LAT_NORTH + 0.1F));
+	}
+
+	/**
+	 * Deterministically generates and returns the startLon for this ride.
+	 */
+	public float startLon() {
+		return aFloat((float) (GeoUtils.LON_WEST - 0.1), (float) (GeoUtils.LON_EAST + 0.1F));
+	}
+
+	/**
+	 * Deterministically generates and returns the endLat for this ride.
+	 */
+	public float endLat() {
+		return bFloat((float) (GeoUtils.LAT_SOUTH - 0.1), (float) (GeoUtils.LAT_NORTH + 0.1F));
+	}
+
+	/**
+	 * Deterministically generates and returns the endLon for this ride.
+	 */
+	public float endLon() {
+		return bFloat((float) (GeoUtils.LON_WEST - 0.1), (float) (GeoUtils.LON_EAST + 0.1F));
+	}
+
+	/**
+	 * Deterministically generates and returns the passengerCnt for this ride.
+	 */
+	public short passengerCnt() {
+		return (short) aLong(1L, 4L);
+	}
+
+	/**
+	 * Deterministically generates and returns the paymentType for this ride.
+	 */
+	public String paymentType() {
+		return (rideId % 2 == 0) ? "CARD" : "CASH";
+	}
+
+	/**
+	 * Deterministically generates and returns the tip for this ride.
+	 *
+	 * <p>The HourlyTips exercise is more interesting if there's some significant variation in tipping.
+	 */
+	public float tip() {
+		return aLong(0L, 60L, 10F, 15F);
+	}
+
+	/**
+	 * Deterministically generates and returns the tolls for this ride.
+	 */
+	public float tolls() {
+		return (rideId % 10 == 0) ? aLong(0L, 5L) : 0L;
+	}
+
+	/**
+	 * Deterministically generates and returns the totalFare for this ride.
+	 */
+	public float totalFare() {
+		return (float) (3.0 + (1.0 * rideDurationMinutes()) + tip() + tolls());
+	}
+
+	/**
+	 * The LongRides exercise needs to have some rides with a duration > 2 hours, but not too many.
+	 */
+	private long rideDurationMinutes() {
+		return aLong(0L, 600, 20, 40);
+	}
+
+	// -------------------------------------
+
+	private long aLong(long min, long max) {
+		float mean = (min + max) / 2.0F;
+		float stddev = (max - min) / 8F;
+
+		return aLong(min, max, mean, stddev);
+	}
+
+	// the rideId is used as the seed to guarantee deterministic results
+	private long aLong(long min, long max, float mean, float stddev) {
+		Random rnd = new Random(rideId);
+		long value;
+		do {
+			value = (long) Math.round((stddev * rnd.nextGaussian()) + mean);
+		} while ((value < min) || (value > max));
+		return value;
+	}
+
+	// -------------------------------------
+
+	private float aFloat(float min, float max) {
+		float mean = (min + max) / 2.0F;
+		float stddev = (max - min) / 8F;
+
+		return aFloat(rideId, min, max, mean, stddev);
+	}
+
+	private float bFloat(float min, float max) {
+		float mean = (min + max) / 2.0F;
+		float stddev = (max - min) / 8F;
+
+		return aFloat(rideId + 42, min, max, mean, stddev);
+	}
+
+	// the rideId is used as the seed to guarantee deterministic results
+	private float aFloat(long seed, float min, float max, float mean, float stddev) {
+		Random rnd = new Random(seed);
+		float value;
+		do {
+			value = (float) (stddev * rnd.nextGaussian()) + mean;
+		} while ((value < min) || (value > max));
+		return value;
+	}
+
+}
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java
index 3440a15..172b300 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/ExerciseBase.java
@@ -33,9 +33,6 @@
 	public static SinkFunction out = null;
 	public static int parallelism = 4;
 
-	public static final String PATH_TO_RIDE_DATA = "/Users/david/stuff/flink-training/trainingData/nycTaxiRides.gz";
-	public static final String PATH_TO_FARE_DATA = "/Users/david/stuff/flink-training/trainingData/nycTaxiFares.gz";
-
 	/**
 	 * Retrieves a test source during unit tests and the given one during normal execution.
 	 */
diff --git a/hourly-tips/DISCUSSION.md b/hourly-tips/DISCUSSION.md
index c1571c5..6c9541a 100644
--- a/hourly-tips/DISCUSSION.md
+++ b/hourly-tips/DISCUSSION.md
@@ -73,7 +73,7 @@
 
 to compute `hourlyTips`.
 
-Having computed `hourlyTips`, it is a good idea to take a look at what this stream looks like. `hourlyTips.print()` yields this,
+Having computed `hourlyTips`, it is a good idea to take a look at what this stream looks like. `hourlyTips.print()` yields something like this,
 
 ```
 1> (1357002000000,2013000019,1.0)
diff --git a/hourly-tips/README.md b/hourly-tips/README.md
index d57ab9e..396d609 100644
--- a/hourly-tips/README.md
+++ b/hourly-tips/README.md
@@ -25,9 +25,9 @@
 
 ### Input Data
 
-The input data of this exercise is a stream of `TaxiFare` events generated by the [Taxi Fare Stream Source](../README.md#using-the-taxi-data-streams).
+The input data of this exercise is a stream of `TaxiFare` events generated by the [Taxi Fare Stream Generator](../README.md#using-the-taxi-data-streams).
 
-The `TaxiFareSource` annotates the generated `DataStream<TaxiFare>` with timestamps and watermarks. Hence, there is no need to provide a custom timestamp and watermark assigner in order to correctly use event time.
+The `TaxiFareGenerator` annotates the generated `DataStream<TaxiFare>` with timestamps and watermarks. Hence, there is no need to provide a custom timestamp and watermark assigner in order to correctly use event time.
 
 ### Expected Output
 
diff --git a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
index b926de1..431543f 100644
--- a/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
+++ b/hourly-tips/src/main/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsExercise.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.training.exercises.hourlytips;
 
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
-import org.apache.flink.training.exercises.common.sources.TaxiFareSource;
+import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
 import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 
@@ -33,35 +32,23 @@
  * <p>The task of the exercise is to first calculate the total tips collected by each driver, hour by hour, and
  * then from that stream, find the highest tip total in each hour.
  *
- * <p>Parameters:
- * -input path-to-input-file
  */
 public class HourlyTipsExercise extends ExerciseBase {
 
 	/**
 	 * Main method.
 	 *
-	 * <p>Parameters:
-	 * -input path-to-input-file
-	 *
 	 * @throws Exception which occurs during job execution.
 	 */
 	public static void main(String[] args) throws Exception {
 
-		// read parameters
-		ParameterTool params = ParameterTool.fromArgs(args);
-		final String input = params.get("input", ExerciseBase.PATH_TO_FARE_DATA);
-
-		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
-		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
-
 		// set up streaming execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(ExerciseBase.parallelism);
 
 		// start the data generator
-		DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor)));
+		DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator()));
 
 		throw new MissingSolutionException();
 
diff --git a/hourly-tips/src/main/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsExercise.scala b/hourly-tips/src/main/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsExercise.scala
index ec38975..0bba57c 100644
--- a/hourly-tips/src/main/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsExercise.scala
+++ b/hourly-tips/src/main/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsExercise.scala
@@ -18,10 +18,9 @@
 
 package org.apache.flink.training.exercises.hourlytips.scala
 
-import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.training.exercises.common.sources.TaxiFareSource
+import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator
 import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.training.exercises.common.utils.{ExerciseBase, MissingSolutionException}
 
@@ -31,27 +30,18 @@
   * The task of the exercise is to first calculate the total tips collected by each driver, hour by hour, and
   * then from that stream, find the highest tip total in each hour.
   *
-  * Parameters:
-  * -input path-to-input-file
   */
 object HourlyTipsExercise {
 
   def main(args: Array[String]) {
 
-    // read parameters
-    val params = ParameterTool.fromArgs(args)
-    val input = params.get("input", ExerciseBase.PATH_TO_FARE_DATA)
-
-    val maxDelay = 60 // events are delayed by at most 60 seconds
-    val speed = 600   // events of 10 minutes are served in 1 second
-
     // set up streaming execution environment
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(ExerciseBase.parallelism)
 
     // start the data generator
-    val fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxDelay, speed)))
+    val fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
 
     throw new MissingSolutionException()
 
diff --git a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
index ed01320..b6d9021 100644
--- a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
+++ b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
@@ -19,7 +19,6 @@
 package org.apache.flink.training.solutions.hourlytips;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -28,7 +27,7 @@
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
-import org.apache.flink.training.exercises.common.sources.TaxiFareSource;
+import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
 import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.util.Collector;
 
@@ -38,35 +37,23 @@
  * <p>The task of the exercise is to first calculate the total tips collected by each driver, hour by hour, and
  * then from that stream, find the highest tip total in each hour.
  *
- * <p>Parameters:
- * -input path-to-input-file
  */
 public class HourlyTipsSolution extends ExerciseBase {
 
 	/**
 	 * Main method.
 	 *
-	 * <p>Parameters:
-	 * -input path-to-input-file
-	 *
 	 * @throws Exception which occurs during job execution.
 	 */
 	public static void main(String[] args) throws Exception {
 
-		// read parameters
-		ParameterTool params = ParameterTool.fromArgs(args);
-		final String input = params.get("input", ExerciseBase.PATH_TO_FARE_DATA);
-
-		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
-		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
-
 		// set up streaming execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(ExerciseBase.parallelism);
 
 		// start the data generator
-		DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor)));
+		DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator()));
 
 		// compute tips per hour for each driver
 		DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
@@ -96,6 +83,7 @@
 	 */
 	public static class AddTips extends ProcessWindowFunction<
 			TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
+
 		@Override
 		public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) {
 			float sumOfTips = 0F;
diff --git a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
index 06db503..3d43928 100644
--- a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
+++ b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
@@ -18,7 +18,6 @@
 
 package org.apache.flink.training.solutions.hourlytips.scala
 
-import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
@@ -26,7 +25,7 @@
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
 import org.apache.flink.training.exercises.common.datatypes.TaxiFare
-import org.apache.flink.training.exercises.common.sources.TaxiFareSource
+import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator
 import org.apache.flink.training.exercises.common.utils.ExerciseBase
 import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.util.Collector
@@ -37,27 +36,18 @@
   * The task of the exercise is to first calculate the total tips collected by each driver, hour by hour, and
   * then from that stream, find the highest tip total in each hour.
   *
-  * Parameters:
-  * -input path-to-input-file
   */
 object HourlyTipsSolution {
 
   def main(args: Array[String]) {
 
-    // read parameters
-    val params = ParameterTool.fromArgs(args)
-    val input = params.get("input", ExerciseBase.PATH_TO_FARE_DATA)
-
-    val maxDelay = 60 // events are delayed by at most 60 seconds
-    val speed = 600   // events of 10 minutes are served in 1 second
-
     // set up streaming execution environment
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(ExerciseBase.parallelism)
 
     // start the data generator
-    val fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxDelay, speed)))
+    val fares = env.addSource(fareSourceOrTest(new TaxiFareGenerator()))
 
     // total tips per hour by driver
     val hourlyTips = fares
diff --git a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
index eef1bdf..2924a21 100644
--- a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
+++ b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
@@ -23,10 +23,9 @@
 import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
 import org.apache.flink.training.solutions.hourlytips.HourlyTipsSolution;
 
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
 import org.junit.Test;
 
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
@@ -45,7 +44,7 @@
 				one
 		);
 
-		Tuple3<Long, Long, Float> max = Tuple3.of(t(60), 1L, 1.0F);
+		Tuple3<Long, Long, Float> max = Tuple3.of(t(60).toEpochMilli(), 1L, 1.0F);
 
 		List<Tuple3<Long, Long, Float>> expected = Collections.singletonList(max);
 
@@ -64,8 +63,8 @@
 				tenIn2
 		);
 
-		Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60), 1L, 6.0F);
-		Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120), 1L, 10.0F);
+		Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L, 6.0F);
+		Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 1L, 10.0F);
 
 		List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
 
@@ -86,20 +85,20 @@
 				twentyFor2In2
 		);
 
-		Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60), 1L, 6.0F);
-		Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120), 2L, 20.0F);
+		Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60).toEpochMilli(), 1L, 6.0F);
+		Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120).toEpochMilli(), 2L, 20.0F);
 
 		List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
 
 		assertEquals(expected, results(source));
 	}
 
-	private long t(int n) {
-		return new DateTime(2000, 1, 1, 0, 0, DateTimeZone.UTC).plusMinutes(n).getMillis();
+	private Instant t(int minutes) {
+		return Instant.parse("2020-01-01T12:00:00.00Z").plusSeconds(60 * minutes);
 	}
 
-	private TaxiFare testFare(long driverId, long startTime, float tip) {
-		return new TaxiFare(0, 0, driverId, new DateTime(startTime), "", tip, 0F, 0F);
+	private TaxiFare testFare(long driverId, Instant startTime, float tip) {
+		return new TaxiFare(0, 0, driverId, startTime, "", tip, 0F, 0F);
 	}
 
 	protected List<Tuple3<Long, Long, Float>> results(TestFareSource source) throws Exception {
diff --git a/long-ride-alerts/README.md b/long-ride-alerts/README.md
index 7c6527e..f6514a5 100644
--- a/long-ride-alerts/README.md
+++ b/long-ride-alerts/README.md
@@ -19,32 +19,30 @@
 
 # Lab: `ProcessFunction` and Timers (Long Ride Alerts)
 
-The goal of the "Long Ride Alerts" exercise is to indicate whenever a taxi ride started two hours ago, and is still ongoing.
+The goal of the "Long Ride Alerts" exercise is to provide a real-time warning whenever a taxi ride
+started two hours ago, and is still ongoing.
+
+This should be done using the event time timestamps and watermarks that are provided in the data stream. 
+
+The stream is out-of-order, and it is possible that the END event for a ride will be processed before
+its START event. But in such cases, we never care to create an alert, since we do know that the ride
+has ended.
 
 ### Input Data
 
-The input data of this exercise is a `DataStream` of taxi ride events. You will want to use a `TaxiRideSource`, as described in the page about the [Taxi Data Stream](../README.md#using-the-taxi-data-streams).
-
-You can filter the events to only include rides within New York City (as is done in the [Taxi Ride Cleansing exercise](../ride-cleansing)), but it is not essential for this lab.
+The input data of this exercise is a `DataStream` of taxi ride events.
 
 ### Expected Output
 
-The result of the exercise should be a `DataStream<TaxiRide>` that only contains START events of taxi rides which have no matching END event within the first two hours of the ride.
+The goal of this exercise is _not_ to find all rides that lasted for more than two hours, but rather
+to create an alert _in real time_ at the moment it becomes known that a ride has been going on for
+more than two hours.
+
+The result of the exercise should be a `DataStream<TaxiRide>` that only contains START events of
+taxi rides that started two hours earlier, and whose END event hasn't yet arrived.
 
 The resulting stream should be printed to standard out.
 
-Here are the `rideIds` and start times of the first few rides that go on for more than two hours, but you might want to print other info as well:
-
-~~~
-> 2758,2013-01-01 00:10:13
-> 7575,2013-01-01 00:20:23
-> 22131,2013-01-01 00:47:03
-> 25473,2013-01-01 00:53:10
-> 29907,2013-01-01 01:01:15
-> 30796,2013-01-01 01:03:00
-...
-~~~
-
 ## Getting Started
 
 > :information_source: Rather than following these links to the sources, you might prefer to open these classes in your IDE.
diff --git a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
index ba48f50..eff8e56 100644
--- a/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
+++ b/long-ride-alerts/src/main/java/org/apache/flink/training/exercises/longrides/LongRidesExercise.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.training.exercises.longrides;
 
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.TimerService;
@@ -26,7 +25,7 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource;
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
 import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 import org.apache.flink.util.Collector;
@@ -37,34 +36,23 @@
  * <p>The goal for this exercise is to emit START events for taxi rides that have not been matched
  * by an END event during the first 2 hours of the ride.
  *
- * <p>Parameters:
- * -input path-to-input-file
  */
 public class LongRidesExercise extends ExerciseBase {
 
 	/**
 	 * Main method.
 	 *
-	 * <p>Parameters:
-	 * -input path-to-input-file
-	 *
 	 * @throws Exception which occurs during job execution.
 	 */
 	public static void main(String[] args) throws Exception {
 
-		ParameterTool params = ParameterTool.fromArgs(args);
-		final String input = params.get("input", ExerciseBase.PATH_TO_RIDE_DATA);
-
-		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
-		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
-
 		// set up streaming execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(ExerciseBase.parallelism);
 
 		// start the data generator
-		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
+		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()));
 
 		DataStream<TaxiRide> longRides = rides
 				.keyBy((TaxiRide ride) -> ride.rideId)
diff --git a/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala b/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
index 49deb45..5bfc912 100644
--- a/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
+++ b/long-ride-alerts/src/main/scala/org/apache/flink/training/exercises/longrides/scala/LongRidesExercise.scala
@@ -18,13 +18,12 @@
 
 package org.apache.flink.training.exercises.longrides.scala
 
-import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.configuration.Configuration
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
 import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.training.exercises.common.utils.{ExerciseBase, MissingSolutionException}
 import org.apache.flink.util.Collector
@@ -35,26 +34,17 @@
   * The goal for this exercise is to emit START events for taxi rides that have not been matched
   * by an END event during the first 2 hours of the ride.
   *
-  * Parameters:
-  * -input path-to-input-file
   */
 object LongRidesExercise {
 
   def main(args: Array[String]) {
 
-    // parse parameters
-    val params = ParameterTool.fromArgs(args)
-    val input = params.get("input", ExerciseBase.PATH_TO_RIDE_DATA)
-
-    val maxDelay = 60     // events are out of order by max 60 seconds
-    val speed = 1800      // events of 30 minutes are served every second
-
     // set up the execution environment
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(ExerciseBase.parallelism)
 
-    val rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxDelay, speed)))
+    val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
 
     val longRides = rides
       .keyBy(_.rideId)
diff --git a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
index 37ef758..dc6ac2e 100644
--- a/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
+++ b/long-ride-alerts/src/solution/java/org/apache/flink/training/solutions/longrides/LongRidesSolution.java
@@ -20,7 +20,6 @@
 
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.TimerService;
@@ -28,7 +27,7 @@
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource;
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
 import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.util.Collector;
 
@@ -38,34 +37,23 @@
  * <p>The goal for this exercise is to emit START events for taxi rides that have not been matched
  * by an END event during the first 2 hours of the ride.
  *
- * <p>Parameters:
- * -input path-to-input-file
  */
 public class LongRidesSolution extends ExerciseBase {
 
 	/**
 	 * Main method.
 	 *
-	 * <p>Parameters:
-	 * -input path-to-input-file
-	 *
 	 * @throws Exception which occurs during job execution.
 	 */
 	public static void main(String[] args) throws Exception {
 
-		ParameterTool params = ParameterTool.fromArgs(args);
-		final String input = params.get("input", ExerciseBase.PATH_TO_RIDE_DATA);
-
-		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
-		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
-
 		// set up streaming execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
 		env.setParallelism(ExerciseBase.parallelism);
 
 		// start the data generator
-		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
+		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()));
 
 		DataStream<TaxiRide> longRides = rides
 				.keyBy((TaxiRide ride) -> ride.rideId)
diff --git a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
index 29e854c..beff3eb 100644
--- a/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
+++ b/long-ride-alerts/src/solution/scala/org/apache/flink/training/solutions/longrides/scala/LongRidesSolution.scala
@@ -19,12 +19,11 @@
 package org.apache.flink.training.solutions.longrides.scala
 
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
-import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.functions.KeyedProcessFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
 import org.apache.flink.training.exercises.common.utils.ExerciseBase
 import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.util.Collector
@@ -35,27 +34,18 @@
   * The goal for this exercise is to emit START events for taxi rides that have not been matched
   * by an END event during the first 2 hours of the ride.
   *
-  * Parameters:
-  * -input path-to-input-file
   */
 object LongRidesSolution {
 
   def main(args: Array[String]) {
 
-    // parse parameters
-    val params = ParameterTool.fromArgs(args)
-    val input = params.get("input", PATH_TO_RIDE_DATA)
-
-    val maxDelay = 60     // events are out of order by max 60 seconds
-    val speed = 1800      // events of 30 minutes are served every second
-
     // set up the execution environment
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     // operate in Event-time
     env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
     env.setParallelism(ExerciseBase.parallelism)
 
-    val rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxDelay, speed)))
+    val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
 
     val longRides = rides
       .keyBy(_.rideId)
diff --git a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
index ba3e6b6..c0646c6 100644
--- a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
+++ b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
@@ -22,9 +22,9 @@
 import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
 import org.apache.flink.training.solutions.longrides.LongRidesSolution;
 
-import org.joda.time.DateTime;
 import org.junit.Test;
 
+import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
 
@@ -34,14 +34,14 @@
 
 	static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new String[]{});
 
-	private static final DateTime BEGINNING = new DateTime(2000, 1, 1, 0, 0);
+	private static final Instant BEGINNING = Instant.parse("2020-01-01T12:00:00.00Z");
 
 	@Test
 	public void shortRide() throws Exception {
-		DateTime oneMinLater = BEGINNING.plusMinutes(1);
+		Instant oneMinLater = BEGINNING.plusSeconds(60);
 		TaxiRide rideStarted = startRide(1, BEGINNING);
 		TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-		Long markOneMinLater = oneMinLater.getMillis();
+		Long markOneMinLater = oneMinLater.toEpochMilli();
 
 		TestRideSource source = new TestRideSource(rideStarted, endedOneMinLater, markOneMinLater);
 		assert(results(source).isEmpty());
@@ -49,10 +49,10 @@
 
 	@Test
 	public void outOfOrder() throws Exception {
-		DateTime oneMinLater = BEGINNING.plusMinutes(1);
+		Instant oneMinLater = BEGINNING.plusSeconds(60);
 		TaxiRide rideStarted = startRide(1, BEGINNING);
 		TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-		Long markOneMinLater = oneMinLater.getMillis();
+		Long markOneMinLater = oneMinLater.toEpochMilli();
 
 		TestRideSource source = new TestRideSource(endedOneMinLater, rideStarted, markOneMinLater);
 		assert(results(source).isEmpty());
@@ -60,10 +60,10 @@
 
 	@Test
 	public void noStartShort() throws Exception {
-		DateTime oneMinLater = BEGINNING.plusMinutes(1);
+		Instant oneMinLater = BEGINNING.plusSeconds(60);
 		TaxiRide rideStarted = startRide(1, BEGINNING);
 		TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
-		Long markOneMinLater = oneMinLater.getMillis();
+		Long markOneMinLater = oneMinLater.toEpochMilli();
 
 		TestRideSource source = new TestRideSource(endedOneMinLater, markOneMinLater);
 		assert(results(source).isEmpty());
@@ -72,7 +72,7 @@
 	@Test
 	public void noEnd() throws Exception {
 		TaxiRide rideStarted = startRide(1, BEGINNING);
-		Long markThreeHoursLater = BEGINNING.plusHours(3).getMillis();
+		Long markThreeHoursLater = BEGINNING.plusSeconds(180 * 60).toEpochMilli();
 
 		TestRideSource source = new TestRideSource(rideStarted, markThreeHoursLater);
 		assertEquals(Collections.singletonList(rideStarted), results(source));
@@ -81,22 +81,22 @@
 	@Test
 	public void longRide() throws Exception {
 		TaxiRide rideStarted = startRide(1, BEGINNING);
-		Long mark2HoursLater = BEGINNING.plusMinutes(120).getMillis();
-		TaxiRide rideEnded3HoursLater = endRide(rideStarted, BEGINNING.plusHours(3));
+		Long mark2HoursLater = BEGINNING.plusSeconds(120 * 60).toEpochMilli();
+		TaxiRide rideEnded3HoursLater = endRide(rideStarted, BEGINNING.plusSeconds(180 * 60));
 
 		TestRideSource source = new TestRideSource(rideStarted, mark2HoursLater, rideEnded3HoursLater);
 		assertEquals(Collections.singletonList(rideStarted), results(source));
 	}
 
-	private TaxiRide testRide(long rideId, Boolean isStart, DateTime startTime, DateTime endTime) {
+	private TaxiRide testRide(long rideId, Boolean isStart, Instant startTime, Instant endTime) {
 		return new TaxiRide(rideId, isStart, startTime, endTime, -73.9947F, 40.750626F, -73.9947F, 40.750626F, (short) 1, 0, 0);
 	}
 
-	private TaxiRide startRide(long rideId, DateTime startTime) {
-		return testRide(rideId, true, startTime, new DateTime(0));
+	private TaxiRide startRide(long rideId, Instant startTime) {
+		return testRide(rideId, true, startTime, Instant.EPOCH);
 	}
 
-	private TaxiRide endRide(TaxiRide started, DateTime endTime) {
+	private TaxiRide endRide(TaxiRide started, Instant endTime) {
 		return testRide(started.rideId, false, started.startTime, endTime);
 	}
 
diff --git a/ride-cleansing/README.md b/ride-cleansing/README.md
index ab7d4be..0ceb314 100644
--- a/ride-cleansing/README.md
+++ b/ride-cleansing/README.md
@@ -21,7 +21,7 @@
 
 If you haven't already done so, you'll need to first [setup your Flink development environment](../README.md). See [How to do the Labs](../README.md#how-to-do-the-labs) for an overall introduction to these exercises.
 
-The task of the "Taxi Ride Cleansing" exercise is to cleanse a stream of TaxiRide events by removing events that do not start or end in New York City.
+The task of the "Taxi Ride Cleansing" exercise is to cleanse a stream of TaxiRide events by removing events that start or end outside of New York City.
 
 The `GeoUtils` utility class provides a static method `isInNYC(float lon, float lat)` to check if a location is within the NYC area.
 
diff --git a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
index 7e73733..20467b4 100644
--- a/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
+++ b/ride-cleansing/src/main/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingExercise.java
@@ -19,11 +19,10 @@
 package org.apache.flink.training.exercises.ridecleansing;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource;
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
 import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 
@@ -33,35 +32,22 @@
  * <p>The task of the exercise is to filter a data stream of taxi ride records to keep only rides that
  * start and end within New York City. The resulting stream should be printed.
  *
- * <p>Parameters:
- *   -input path-to-input-file
  */
 public class RideCleansingExercise extends ExerciseBase {
 
 	/**
 	 * Main method.
 	 *
-	 * <p>Parameters:
-	 *   -input path-to-input-file
-	 *
-	 * @throws Exception which occurs during job execution.
-	 *
 	 * @throws Exception which occurs during job execution.
 	 */
 	public static void main(String[] args) throws Exception {
 
-		ParameterTool params = ParameterTool.fromArgs(args);
-		final String input = params.get("input", ExerciseBase.PATH_TO_RIDE_DATA);
-
-		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
-		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
-
 		// set up streaming execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(ExerciseBase.parallelism);
 
 		// start the data generator
-		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
+		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()));
 
 		DataStream<TaxiRide> filteredRides = rides
 				// filter out rides that do not start or stop in NYC
diff --git a/ride-cleansing/src/main/scala/org/apache/flink/training/exercises/ridecleansing/scala/RideCleansingExercise.scala b/ride-cleansing/src/main/scala/org/apache/flink/training/exercises/ridecleansing/scala/RideCleansingExercise.scala
index a0e6c6b..fe02b3b 100644
--- a/ride-cleansing/src/main/scala/org/apache/flink/training/exercises/ridecleansing/scala/RideCleansingExercise.scala
+++ b/ride-cleansing/src/main/scala/org/apache/flink/training/exercises/ridecleansing/scala/RideCleansingExercise.scala
@@ -18,10 +18,9 @@
 
 package org.apache.flink.training.exercises.ridecleansing.scala
 
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
 import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.training.exercises.common.utils.{ExerciseBase, GeoUtils, MissingSolutionException}
-import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.scala._
 
 /**
@@ -31,25 +30,17 @@
  * start and end within New York City. The resulting stream should be printed to the
  * standard out.
  *
- * Parameters:
- * -input path-to-input-file
  */
 object RideCleansingExercise extends ExerciseBase {
 
   def main(args: Array[String]) {
-    // parse parameters
-    val params = ParameterTool.fromArgs(args)
-    val input = params.get("input", ExerciseBase.PATH_TO_RIDE_DATA)
-
-    val maxDelay = 60 // events are out of order by max 60 seconds
-    val speed = 600   // events of 10 minutes are served in 1 second
 
     // set up the execution environment
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(parallelism)
 
     // get the taxi ride data stream
-    val rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxDelay, speed)))
+    val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
 
     val filteredRides = rides
       // filter out rides that do not start and end in NYC
diff --git a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
index e16ea83..1d7af4c 100644
--- a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
+++ b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
@@ -19,11 +19,10 @@
 package org.apache.flink.training.solutions.ridecleansing;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource;
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
 import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.training.exercises.common.utils.GeoUtils;
 
@@ -33,33 +32,22 @@
  * <p>The task of the exercise is to filter a data stream of taxi ride records to keep only rides that
  * start and end within New York City. The resulting stream should be printed.
  *
- * <p>Parameters:
- *   -input path-to-input-file
  */
 public class RideCleansingSolution extends ExerciseBase {
 
 	/**
 	 * Main method.
 	 *
-	 * <p>Parameters:
-	 *   -input path-to-input-file
-	 *
 	 * @throws Exception which occurs during job execution.
 	 */
 	public static void main(String[] args) throws Exception {
 
-		ParameterTool params = ParameterTool.fromArgs(args);
-		final String input = params.get("input", PATH_TO_RIDE_DATA);
-
-		final int maxEventDelay = 60;       // events are out of order by max 60 seconds
-		final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second
-
 		// set up streaming execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(ExerciseBase.parallelism);
 
 		// start the data generator
-		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxEventDelay, servingSpeedFactor)));
+		DataStream<TaxiRide> rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()));
 
 		DataStream<TaxiRide> filteredRides = rides
 				// keep only those rides and both start and end in NYC
diff --git a/ride-cleansing/src/solution/scala/org/apache/flink/training/solutions/ridecleansing/scala/RideCleansingSolution.scala b/ride-cleansing/src/solution/scala/org/apache/flink/training/solutions/ridecleansing/scala/RideCleansingSolution.scala
index d831259..f60cae7 100644
--- a/ride-cleansing/src/solution/scala/org/apache/flink/training/solutions/ridecleansing/scala/RideCleansingSolution.scala
+++ b/ride-cleansing/src/solution/scala/org/apache/flink/training/solutions/ridecleansing/scala/RideCleansingSolution.scala
@@ -18,9 +18,8 @@
 
 package org.apache.flink.training.solutions.ridecleansing.scala
 
-import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.scala._
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator
 import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.training.exercises.common.utils.{ExerciseBase, GeoUtils}
 
@@ -31,25 +30,17 @@
  * start and end within New York City. The resulting stream should be printed to the
  * standard out.
  *
- * Parameters:
- * -input path-to-input-file
  */
 object RideCleansingSolution {
 
   def main(args: Array[String]) {
-    // parse parameters
-    val params = ParameterTool.fromArgs(args)
-    val input = params.get("input", ExerciseBase.PATH_TO_RIDE_DATA)
-
-    val maxDelay = 60 // events are out of order by max 60 seconds
-    val speed = 600   // events of 10 minutes are served in 1 second
 
     // set up the execution environment
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(ExerciseBase.parallelism)
 
     // get the taxi ride data stream
-    val rides = env.addSource(rideSourceOrTest(new TaxiRideSource(input, maxDelay, speed)))
+    val rides = env.addSource(rideSourceOrTest(new TaxiRideGenerator()))
 
     val filteredRides = rides
       .filter(r => GeoUtils.isInNYC(r.startLon, r.startLat) && GeoUtils.isInNYC(r.endLon, r.endLat))
diff --git a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
index e4554d4..e19f4f3 100644
--- a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
+++ b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
@@ -22,9 +22,9 @@
 import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
 import org.apache.flink.training.solutions.ridecleansing.RideCleansingSolution;
 
-import org.joda.time.DateTime;
 import org.junit.Test;
 
+import java.time.Instant;
 import java.util.Collections;
 import java.util.List;
 
@@ -55,7 +55,7 @@
 	}
 
 	private TaxiRide testRide(float startLon, float startLat, float endLon, float endLat) {
-		return new TaxiRide(1L, true, new DateTime(0), new DateTime(0),
+		return new TaxiRide(1L, true, Instant.EPOCH, Instant.EPOCH,
 				startLon, startLat, endLon, endLat, (short) 1, 0, 0);
 	}
 
diff --git a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
index 2817f19..64175d2 100644
--- a/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
+++ b/rides-and-fares/src/main/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresExercise.java
@@ -19,15 +19,14 @@
 package org.apache.flink.training.exercises.ridesandfares;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.common.sources.TaxiFareSource;
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource;
+import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
 import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.training.exercises.common.utils.MissingSolutionException;
 import org.apache.flink.util.Collector;
@@ -37,41 +36,27 @@
  *
  * <p>The goal for this exercise is to enrich TaxiRides with fare information.
  *
- * <p>Parameters:
- * -rides path-to-input-file
- * -fares path-to-input-file
  */
 public class RidesAndFaresExercise extends ExerciseBase {
 
 	/**
 	 * Main method.
 	 *
-	 * <p>Parameters:
-	 * -rides path-to-input-file
-	 * -fares path-to-input-file
-	 *
 	 * @throws Exception which occurs during job execution.
 	 */
 	public static void main(String[] args) throws Exception {
 
-		ParameterTool params = ParameterTool.fromArgs(args);
-		final String ridesFile = params.get("rides", PATH_TO_RIDE_DATA);
-		final String faresFile = params.get("fares", PATH_TO_FARE_DATA);
-
-		final int delay = 60;					// at most 60 seconds of delay
-		final int servingSpeedFactor = 1800; 	// 30 minutes worth of events are served every second
-
 		// set up streaming execution environment
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(ExerciseBase.parallelism);
 
 		DataStream<TaxiRide> rides = env
-				.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
+				.addSource(rideSourceOrTest(new TaxiRideGenerator()))
 				.filter((TaxiRide ride) -> ride.isStart)
 				.keyBy((TaxiRide ride) -> ride.rideId);
 
 		DataStream<TaxiFare> fares = env
-				.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
+				.addSource(fareSourceOrTest(new TaxiFareGenerator()))
 				.keyBy((TaxiFare fare) -> fare.rideId);
 
 		DataStream<Tuple2<TaxiRide, TaxiFare>> enrichedRides = rides
diff --git a/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala b/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
index cd5b028..8d377d2 100644
--- a/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
+++ b/rides-and-fares/src/main/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresExercise.scala
@@ -18,11 +18,10 @@
 
 package org.apache.flink.training.exercises.ridesandfares.scala
 
-import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
 import org.apache.flink.training.exercises.common.datatypes.{TaxiFare, TaxiRide}
-import org.apache.flink.training.exercises.common.sources.{TaxiFareSource, TaxiRideSource}
+import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator, TaxiRideGenerator}
 import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.training.exercises.common.utils.{ExerciseBase, MissingSolutionException}
 import org.apache.flink.util.Collector
@@ -32,33 +31,22 @@
   *
   * The goal for this exercise is to enrich TaxiRides with fare information.
   *
-  * Parameters:
-  * -rides path-to-input-file
-  * -fares path-to-input-file
   */
 object RidesAndFaresExercise {
 
   def main(args: Array[String]) {
 
-    // parse parameters
-    val params = ParameterTool.fromArgs(args)
-    val ridesFile = params.get("rides", ExerciseBase.PATH_TO_RIDE_DATA)
-    val faresFile = params.get("fares", ExerciseBase.PATH_TO_FARE_DATA)
-
-    val delay = 60;               // at most 60 seconds of delay
-    val servingSpeedFactor = 1800 // 30 minutes worth of events are served every second
-
     // set up streaming execution environment
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(ExerciseBase.parallelism)
 
     val rides = env
-      .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
+      .addSource(rideSourceOrTest(new TaxiRideGenerator()))
       .filter { ride => ride.isStart }
       .keyBy { ride => ride.rideId }
 
     val fares = env
-      .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
+      .addSource(fareSourceOrTest(new TaxiFareGenerator()))
       .keyBy { fare => fare.rideId }
 
     val processed = rides
diff --git a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
index 20fb255..bcddc7b 100644
--- a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
+++ b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
@@ -21,7 +21,6 @@
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
@@ -29,8 +28,8 @@
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
 import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-import org.apache.flink.training.exercises.common.sources.TaxiFareSource;
-import org.apache.flink.training.exercises.common.sources.TaxiRideSource;
+import org.apache.flink.training.exercises.common.sources.TaxiFareGenerator;
+import org.apache.flink.training.exercises.common.sources.TaxiRideGenerator;
 import org.apache.flink.training.exercises.common.utils.ExerciseBase;
 import org.apache.flink.util.Collector;
 
@@ -39,30 +38,16 @@
  *
  * <p>The goal for this exercise is to enrich TaxiRides with fare information.
  *
- * <p>Parameters:
- * -rides path-to-input-file
- * -fares path-to-input-file
  */
 public class RidesAndFaresSolution extends ExerciseBase {
 
 	/**
 	 * Main method.
 	 *
-	 * <p>Parameters:
-	 * -rides path-to-input-file
-	 * -fares path-to-input-file
-	 *
 	 * @throws Exception which occurs during job execution.
 	 */
 	public static void main(String[] args) throws Exception {
 
-		ParameterTool params = ParameterTool.fromArgs(args);
-		final String ridesFile = params.get("rides", PATH_TO_RIDE_DATA);
-		final String faresFile = params.get("fares", PATH_TO_FARE_DATA);
-
-		final int delay = 60;					// at most 60 seconds of delay
-		final int servingSpeedFactor = 1800; 	// 30 minutes worth of events are served every second
-
 		// Set up streaming execution environment, including Web UI and REST endpoint.
 		// Checkpointing isn't needed for the RidesAndFares exercise; this setup is for
 		// using the State Processor API.
@@ -79,12 +64,12 @@
 		config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
 
 		DataStream<TaxiRide> rides = env
-				.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
+				.addSource(rideSourceOrTest(new TaxiRideGenerator()))
 				.filter((TaxiRide ride) -> ride.isStart)
 				.keyBy((TaxiRide ride) -> ride.rideId);
 
 		DataStream<TaxiFare> fares = env
-				.addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
+				.addSource(fareSourceOrTest(new TaxiFareGenerator()))
 				.keyBy((TaxiFare fare) -> fare.rideId);
 
 		// Set a UID on the stateful flatmap operator so we can read its state using the State Processor API.
diff --git a/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala b/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
index b7add66..2af05cd 100644
--- a/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
+++ b/rides-and-fares/src/solution/scala/org/apache/flink/training/solutions/ridesandfares/scala/RidesAndFaresSolution.scala
@@ -19,11 +19,10 @@
 package org.apache.flink.training.solutions.ridesandfares.scala
 
 import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
-import org.apache.flink.api.java.utils.ParameterTool
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction
 import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _}
 import org.apache.flink.training.exercises.common.datatypes.{TaxiFare, TaxiRide}
-import org.apache.flink.training.exercises.common.sources.{TaxiFareSource, TaxiRideSource}
+import org.apache.flink.training.exercises.common.sources.{TaxiFareGenerator, TaxiRideGenerator}
 import org.apache.flink.training.exercises.common.utils.ExerciseBase
 import org.apache.flink.training.exercises.common.utils.ExerciseBase._
 import org.apache.flink.util.Collector
@@ -33,33 +32,22 @@
   *
   * The goal for this exercise is to enrich TaxiRides with fare information.
   *
-  * Parameters:
-  * -rides path-to-input-file
-  * -fares path-to-input-file
   */
 object RidesAndFaresSolution {
 
   def main(args: Array[String]) {
 
-    // parse parameters
-    val params = ParameterTool.fromArgs(args)
-    val ridesFile = params.get("rides", ExerciseBase.PATH_TO_RIDE_DATA)
-    val faresFile = params.get("fares", ExerciseBase.PATH_TO_FARE_DATA)
-
-    val delay = 60;               // at most 60 seconds of delay
-    val servingSpeedFactor = 1800 // 30 minutes worth of events are served every second
-
     // set up streaming execution environment
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     env.setParallelism(ExerciseBase.parallelism)
 
     val rides = env
-      .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
+      .addSource(rideSourceOrTest(new TaxiRideGenerator()))
       .filter { ride => ride.isStart }
       .keyBy { ride => ride.rideId }
 
     val fares = env
-      .addSource(fareSourceOrTest(new TaxiFareSource(faresFile, delay, servingSpeedFactor)))
+      .addSource(fareSourceOrTest(new TaxiFareGenerator()))
       .keyBy { fare => fare.rideId }
 
     val processed = rides
diff --git a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
index c2b1fb1..76b4407 100644
--- a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
+++ b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
@@ -24,9 +24,9 @@
 import org.apache.flink.training.exercises.testing.TaxiRideTestBase;
 import org.apache.flink.training.solutions.ridesandfares.RidesAndFaresSolution;
 
-import org.joda.time.DateTime;
 import org.junit.Test;
 
+import java.time.Instant;
 import java.util.Arrays;
 import java.util.List;
 
@@ -37,7 +37,6 @@
 
 	static final Testable JAVA_EXERCISE = () -> RidesAndFaresExercise.main(new String[]{});
 
-
 	final TaxiRide ride1 = testRide(1);
 	final TaxiRide ride2 = testRide(2);
 	final TaxiFare fare1 = testFare(1);
@@ -68,12 +67,12 @@
 	}
 
 	private TaxiRide testRide(long rideId) {
-		return new TaxiRide(rideId, true, new DateTime(0), new DateTime(0),
+		return new TaxiRide(rideId, true, Instant.EPOCH, Instant.EPOCH,
 				0F, 0F, 0F, 0F, (short) 1, 0, rideId);
 	}
 
 	private TaxiFare testFare(long rideId) {
-		return new TaxiFare(rideId, 0, rideId, new DateTime(0), "", 0F, 0F, 0F);
+		return new TaxiFare(rideId, 0, rideId, Instant.EPOCH, "", 0F, 0F, 0F);
 	}
 
 	protected List<?> results(TestRideSource rides, TestFareSource fares) throws Exception {