[FLINK-17277] address IntelliJ warnings
diff --git a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
index fe17f93..ac7424a 100644
--- a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
+++ b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
@@ -67,8 +67,8 @@
 		// map each ride to a tuple of (driverId, 1)
 		DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
 					@Override
-					public Tuple2<Long, Long> map(TaxiRide ride) throws Exception {
-						return new Tuple2<Long, Long>(ride.driverId, 1L);
+					public Tuple2<Long, Long> map(TaxiRide ride) {
+						return Tuple2.of(ride.driverId, 1L);
 					}
 		});
 
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
index 393638e..f1da3e6 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
@@ -34,7 +34,7 @@
  */
 public class TaxiFare implements Serializable {
 
-	private static transient DateTimeFormatter timeFormatter =
+	private static final DateTimeFormatter TIME_FORMATTER =
 			DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
 
 	/**
@@ -69,17 +69,14 @@
 
 	@Override
 	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append(rideId).append(",");
-		sb.append(taxiId).append(",");
-		sb.append(driverId).append(",");
-		sb.append(startTime.toString(timeFormatter)).append(",");
-		sb.append(paymentType).append(",");
-		sb.append(tip).append(",");
-		sb.append(tolls).append(",");
-		sb.append(totalFare);
-
-		return sb.toString();
+		return rideId + "," +
+				taxiId + "," +
+				driverId + "," +
+				startTime.toString(TIME_FORMATTER) + "," +
+				paymentType + "," +
+				tip + "," +
+				tolls + "," +
+				totalFare;
 	}
 
 	/**
@@ -98,7 +95,7 @@
 			ride.rideId = Long.parseLong(tokens[0]);
 			ride.taxiId = Long.parseLong(tokens[1]);
 			ride.driverId = Long.parseLong(tokens[2]);
-			ride.startTime = DateTime.parse(tokens[3], timeFormatter);
+			ride.startTime = DateTime.parse(tokens[3], TIME_FORMATTER);
 			ride.paymentType = tokens[4];
 			ride.tip = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
 			ride.tolls = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
index 7c78dec..a923a00 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
@@ -24,6 +24,8 @@
 import org.joda.time.format.DateTimeFormat;
 import org.joda.time.format.DateTimeFormatter;
 
+import javax.annotation.Nullable;
+
 import java.io.Serializable;
 import java.util.Locale;
 
@@ -45,7 +47,7 @@
  */
 public class TaxiRide implements Comparable<TaxiRide>, Serializable {
 
-	private static transient DateTimeFormatter timeFormatter =
+	private static final DateTimeFormatter TIME_FORMATTER =
 			DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
 
 	/**
@@ -89,20 +91,17 @@
 
 	@Override
 	public String toString() {
-		StringBuilder sb = new StringBuilder();
-		sb.append(rideId).append(",");
-		sb.append(isStart ? "START" : "END").append(",");
-		sb.append(startTime.toString(timeFormatter)).append(",");
-		sb.append(endTime.toString(timeFormatter)).append(",");
-		sb.append(startLon).append(",");
-		sb.append(startLat).append(",");
-		sb.append(endLon).append(",");
-		sb.append(endLat).append(",");
-		sb.append(passengerCnt).append(",");
-		sb.append(taxiId).append(",");
-		sb.append(driverId);
-
-		return sb.toString();
+		return rideId + "," +
+				(isStart ? "START" : "END") + "," +
+				startTime.toString(TIME_FORMATTER) + "," +
+				endTime.toString(TIME_FORMATTER) + "," +
+				startLon + "," +
+				startLat + "," +
+				endLon + "," +
+				endLat + "," +
+				passengerCnt + "," +
+				taxiId + "," +
+				driverId;
 	}
 
 	/**
@@ -123,13 +122,13 @@
 			switch (tokens[1]) {
 				case "START":
 					ride.isStart = true;
-					ride.startTime = DateTime.parse(tokens[2], timeFormatter);
-					ride.endTime = DateTime.parse(tokens[3], timeFormatter);
+					ride.startTime = DateTime.parse(tokens[2], TIME_FORMATTER);
+					ride.endTime = DateTime.parse(tokens[3], TIME_FORMATTER);
 					break;
 				case "END":
 					ride.isStart = false;
-					ride.endTime = DateTime.parse(tokens[2], timeFormatter);
-					ride.startTime = DateTime.parse(tokens[3], timeFormatter);
+					ride.endTime = DateTime.parse(tokens[2], TIME_FORMATTER);
+					ride.startTime = DateTime.parse(tokens[3], TIME_FORMATTER);
 					break;
 				default:
 					throw new RuntimeException("Invalid record: " + line);
@@ -158,7 +157,7 @@
 	 *     <li>putting START events before END events if they have the same timestamp</li>
 	 * </ul>
 	 */
-	public int compareTo(TaxiRide other) {
+	public int compareTo(@Nullable TaxiRide other) {
 		if (other == null) {
 			return 1;
 		}
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
index c0f3c88..a02eff6 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.Calendar;
 import java.util.Comparator;
 import java.util.PriorityQueue;
@@ -103,7 +104,7 @@
 		}
 		this.dataFilePath = dataFilePath;
 		this.maxDelayMsecs = maxEventDelaySecs * 1000;
-		this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : maxDelayMsecs;
+		this.watermarkDelayMSecs = Math.max(maxDelayMsecs, 10000);
 		this.servingSpeed = servingSpeedFactor;
 	}
 
@@ -111,7 +112,7 @@
 	public void run(SourceContext<TaxiFare> sourceContext) throws Exception {
 
 		gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
-		reader = new BufferedReader(new InputStreamReader(gzipStream, "UTF-8"));
+		reader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8));
 
 		generateUnorderedStream(sourceContext);
 
@@ -130,12 +131,7 @@
 		Random rand = new Random(7452);
 		PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
 				32,
-				new Comparator<Tuple2<Long, Object>>() {
-					@Override
-					public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {
-						return o1.f0.compareTo(o2.f0);
-					}
-				});
+				Comparator.comparing(o -> o.f0));
 
 		// read first ride and insert it into emit schedule
 		String line;
@@ -148,11 +144,11 @@
 			// get delayed time
 			long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);
 
-			emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, fare));
+			emitSchedule.add(Tuple2.of(delayedEventTime, fare));
 			// schedule next watermark
 			long watermarkTime = dataStartTime + watermarkDelayMSecs;
 			Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
-			emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
+			emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
 
 		} else {
 			return;
@@ -176,7 +172,7 @@
 					) {
 				// insert event into emit schedule
 				long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
-				emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, fare));
+				emitSchedule.add(Tuple2.of(delayedEventTime, fare));
 
 				// read next ride
 				if (reader.ready() && (line = reader.readLine()) != null) {
@@ -190,13 +186,14 @@
 			}
 
 			// emit schedule is updated, emit next element in schedule
-			Tuple2<Long, Object> head = emitSchedule.poll();
+			Tuple2<Long, Object> head = emitSchedule.remove();
 			long delayedEventTime = head.f0;
 
 			long now = Calendar.getInstance().getTimeInMillis();
 			long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
 			long waitTime = servingTime - now;
 
+			//noinspection BusyWait
 			Thread.sleep((waitTime > 0) ? waitTime : 0);
 
 			if (head.f1 instanceof TaxiFare) {
@@ -211,7 +208,7 @@
 				// schedule next watermark
 				long watermarkTime = delayedEventTime + watermarkDelayMSecs;
 				Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
-				emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
+				emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
 			}
 		}
 	}
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
index 6bfb970..b19a310 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
@@ -28,6 +28,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.Calendar;
 import java.util.Comparator;
 import java.util.PriorityQueue;
@@ -103,7 +104,7 @@
 		}
 		this.dataFilePath = dataFilePath;
 		this.maxDelayMsecs = maxEventDelaySecs * 1000;
-		this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : maxDelayMsecs;
+		this.watermarkDelayMSecs = Math.max(maxDelayMsecs, 10000);
 		this.servingSpeed = servingSpeedFactor;
 	}
 
@@ -111,7 +112,7 @@
 	public void run(SourceContext<TaxiRide> sourceContext) throws Exception {
 
 		gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
-		reader = new BufferedReader(new InputStreamReader(gzipStream, "UTF-8"));
+		reader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8));
 
 		generateUnorderedStream(sourceContext);
 
@@ -130,12 +131,7 @@
 		Random rand = new Random(7452);
 		PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
 				32,
-				new Comparator<Tuple2<Long, Object>>() {
-					@Override
-					public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {
-						return o1.f0.compareTo(o2.f0);
-					}
-				});
+				Comparator.comparing(o -> o.f0));
 
 		// read first ride and insert it into emit schedule
 		String line;
@@ -148,11 +144,11 @@
 			// get delayed time
 			long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);
 
-			emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
+			emitSchedule.add(Tuple2.of(delayedEventTime, ride));
 			// schedule next watermark
 			long watermarkTime = dataStartTime + watermarkDelayMSecs;
 			Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
-			emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
+			emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
 
 		} else {
 			return;
@@ -176,7 +172,7 @@
 					) {
 				// insert event into emit schedule
 				long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
-				emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
+				emitSchedule.add(Tuple2.of(delayedEventTime, ride));
 
 				// read next ride
 				if (reader.ready() && (line = reader.readLine()) != null) {
@@ -190,13 +186,14 @@
 			}
 
 			// emit schedule is updated, emit next element in schedule
-			Tuple2<Long, Object> head = emitSchedule.poll();
+			Tuple2<Long, Object> head = emitSchedule.remove();
 			long delayedEventTime = head.f0;
 
 			long now = Calendar.getInstance().getTimeInMillis();
 			long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
 			long waitTime = servingTime - now;
 
+			//noinspection BusyWait
 			Thread.sleep((waitTime > 0) ? waitTime : 0);
 
 			if (head.f1 instanceof TaxiRide) {
@@ -211,7 +208,7 @@
 				// schedule next watermark
 				long watermarkTime = delayedEventTime + watermarkDelayMSecs;
 				Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
-				emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
+				emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
 			}
 		}
 	}
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
index 4594248..e638ab3 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
@@ -245,9 +245,8 @@
 
 		double x = destLat - startLat;
 		double y = (destLon - startLon) * Math.cos(startLat);
-		int degrees = (int) Math.toDegrees(Math.atan2(x, y)) + 179;
 
-		return degrees;
+		return (int) Math.toDegrees(Math.atan2(x, y)) + 179;
 	}
 
 }
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
index 6c59de1..f4f5596 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
@@ -30,34 +30,49 @@
 import java.util.List;
 
 public abstract class TaxiRideTestBase<OUT> {
-	public static class TestRideSource extends TestSource implements ResultTypeQueryable<TaxiRide> {
+	public static class TestRideSource extends TestSource<TaxiRide> implements ResultTypeQueryable<TaxiRide> {
 		public TestRideSource(Object ... eventsOrWatermarks) {
 			this.testStream = eventsOrWatermarks;
 		}
 
 		@Override
+		long getTimestamp(TaxiRide ride) {
+			return ride.getEventTime();
+		}
+
+		@Override
 		public TypeInformation<TaxiRide> getProducedType() {
 			return TypeInformation.of(TaxiRide.class);
 		}
 	}
 
-	public static class TestFareSource extends TestSource implements ResultTypeQueryable<TaxiFare> {
+	public static class TestFareSource extends TestSource<TaxiFare> implements ResultTypeQueryable<TaxiFare> {
 		public TestFareSource(Object ... eventsOrWatermarks) {
 			this.testStream = eventsOrWatermarks;
 		}
 
 		@Override
+		long getTimestamp(TaxiFare fare) {
+			return fare.getEventTime();
+		}
+
+		@Override
 		public TypeInformation<TaxiFare> getProducedType() {
 			return TypeInformation.of(TaxiFare.class);
 		}
 	}
 
-	public static class TestStringSource extends TestSource implements ResultTypeQueryable<String> {
+	public static class TestStringSource extends TestSource<String> implements ResultTypeQueryable<String> {
 		public TestStringSource(Object ... eventsOrWatermarks) {
 			this.testStream = eventsOrWatermarks;
 		}
 
 		@Override
+		long getTimestamp(String s) {
+			return 0L;
+		}
+
+		@Override
 		public TypeInformation<String> getProducedType() {
 			return TypeInformation.of(String.class);
 		}
@@ -69,13 +84,13 @@
 		public static final List VALUES = new ArrayList<>();
 
 		@Override
-		public void invoke(OUT value, Context context) throws Exception {
+		public void invoke(OUT value, Context context) {
 			VALUES.add(value);
 		}
 	}
 
 	public interface Testable {
-		public abstract void main() throws Exception;
+		void main() throws Exception;
 	}
 
 	protected List<OUT> runApp(TestRideSource source, TestSink<OUT> sink, Testable exercise, Testable solution) throws Exception {
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
index 57d72ab..84ba53e 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
@@ -20,35 +20,29 @@
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
-import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
 
-public abstract class TestSource implements SourceFunction {
+public abstract class TestSource<T> implements SourceFunction<T> {
 	private volatile boolean running = true;
+	// T or watermark (Long)
 	protected Object[] testStream;
 
 	@Override
-	public void run(SourceContext ctx) throws Exception {
+	public void run(SourceContext<T> ctx) {
 		for (int i = 0; (i < testStream.length) && running; i++) {
-			if (testStream[i] instanceof TaxiRide) {
-				TaxiRide ride = (TaxiRide) testStream[i];
-				ctx.collectWithTimestamp(ride, ride.getEventTime());
-			} else if (testStream[i] instanceof TaxiFare) {
-				TaxiFare fare = (TaxiFare) testStream[i];
-				ctx.collectWithTimestamp(fare, fare.getEventTime());
-			} else if (testStream[i] instanceof String) {
-				String s = (String) testStream[i];
-				ctx.collectWithTimestamp(s, 0);
-			} else if (testStream[i] instanceof Long) {
+			if (testStream[i] instanceof Long) {
 				Long ts = (Long) testStream[i];
 				ctx.emitWatermark(new Watermark(ts));
 			} else {
-				throw new RuntimeException(testStream[i].toString());
+				//noinspection unchecked
+				T element = (T) testStream[i];
+				ctx.collectWithTimestamp(element, getTimestamp(element));
 			}
 		}
 		// test sources are finite, so they have a Long.MAX_VALUE watermark when they finishes
 	}
 
+	abstract long getTimestamp(T element);
+
 	@Override
 	public void cancel() {
 		running = false;
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
index 34b0e76..e656fda 100644
--- a/config/checkstyle/checkstyle.xml
+++ b/config/checkstyle/checkstyle.xml
@@ -18,8 +18,8 @@
 under the License.
 -->
 <!DOCTYPE module PUBLIC
-          "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
-          "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+        "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
+        "https://checkstyle.org/dtds/configuration_1_3.dtd">
 
 <!--
 This is a checkstyle configuration file. For descriptions of
diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml
index 0b05a64..01e36b5 100644
--- a/config/checkstyle/suppressions.xml
+++ b/config/checkstyle/suppressions.xml
@@ -19,8 +19,8 @@
 -->
 
 <!DOCTYPE suppressions PUBLIC
-		"-//Puppy Crawl//DTD Suppressions 1.1//EN"
-		"http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
+		"-//Checkstyle//DTD SuppressionFilter Configuration 1.2//EN"
+		"https://checkstyle.org/dtds/suppressions_1_2.dtd">
 
 <suppressions>
 	<suppress
diff --git a/hourly-tips/DISCUSSION.md b/hourly-tips/DISCUSSION.md
index 2e91b2d..ab653d3 100644
--- a/hourly-tips/DISCUSSION.md
+++ b/hourly-tips/DISCUSSION.md
@@ -41,7 +41,7 @@
 		for (TaxiFare f : fares) {
 			sumOfTips += f.tip;
 		}
-		out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
+		out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
 	}
 }
 ```
diff --git a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
index 4e2bb32..b640a19 100644
--- a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
+++ b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
@@ -96,12 +96,12 @@
 	public static class AddTips extends ProcessWindowFunction<
 			TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
 		@Override
-		public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
-			Float sumOfTips = 0F;
+		public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) {
+			float sumOfTips = 0F;
 			for (TaxiFare f : fares) {
 				sumOfTips += f.tip;
 			}
-			out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
+			out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
 		}
 	}
 }
diff --git a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
index 3ad6243..ab3de6c 100644
--- a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
+++ b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
@@ -82,7 +82,7 @@
   class WrapWithWindowInfo() extends ProcessWindowFunction[(Long, Float), (Long, Long, Float), Long, TimeWindow] {
     override def process(key: Long, context: Context, elements: Iterable[(Long, Float)], out: Collector[(Long, Long, Float)]): Unit = {
       val sumOfTips = elements.iterator.next()._2
-      out.collect((context.window.getEnd(), key, sumOfTips))
+      out.collect((context.window.getEnd, key, sumOfTips))
     }
   }
 
diff --git a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
index 565a8d3..eef1bdf 100644
--- a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
+++ b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
@@ -35,7 +35,7 @@
 
 public class HourlyTipsTest extends TaxiRideTestBase<Tuple3<Long, Long, Float>> {
 
-	static Testable javaExercise = () -> HourlyTipsExercise.main(new String[]{});
+	static final Testable JAVA_EXERCISE = () -> HourlyTipsExercise.main(new String[]{});
 
 	@Test
 	public void testOneDriverOneTip() throws Exception {
@@ -45,7 +45,7 @@
 				one
 		);
 
-		Tuple3<Long, Long, Float> max = new Tuple3<Long, Long, Float>(t(60), 1L, 1.0F);
+		Tuple3<Long, Long, Float> max = Tuple3.of(t(60), 1L, 1.0F);
 
 		List<Tuple3<Long, Long, Float>> expected = Collections.singletonList(max);
 
@@ -64,8 +64,8 @@
 				tenIn2
 		);
 
-		Tuple3<Long, Long, Float> hour1 = new Tuple3<Long, Long, Float>(t(60), 1L, 6.0F);
-		Tuple3<Long, Long, Float> hour2 = new Tuple3<Long, Long, Float>(t(120), 1L, 10.0F);
+		Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60), 1L, 6.0F);
+		Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120), 1L, 10.0F);
 
 		List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
 
@@ -86,8 +86,8 @@
 				twentyFor2In2
 		);
 
-		Tuple3<Long, Long, Float> hour1 = new Tuple3<Long, Long, Float>(t(60), 1L, 6.0F);
-		Tuple3<Long, Long, Float> hour2 = new Tuple3<Long, Long, Float>(t(120), 2L, 20.0F);
+		Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60), 1L, 6.0F);
+		Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120), 2L, 20.0F);
 
 		List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
 
@@ -104,7 +104,7 @@
 
 	protected List<Tuple3<Long, Long, Float>> results(TestFareSource source) throws Exception {
 		Testable javaSolution = () -> HourlyTipsSolution.main(new String[]{});
-		return runApp(source, new TestSink<>(), javaExercise, javaSolution);
+		return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution);
 	}
 
 }
diff --git a/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala b/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
index fbcedf8..aacaab5 100644
--- a/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
+++ b/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
@@ -32,7 +32,7 @@
   override protected def results(source: TaxiRideTestBase.TestFareSource): util.List[tuple.Tuple3[java.lang.Long, java.lang.Long, java.lang.Float]] = {
     val scalaSolution: TaxiRideTestBase.Testable = () => HourlyTipsSolution.main(Array.empty[String])
     val tuples: util.List[_] = runApp(source, new TaxiRideTestBase.TestSink[tuple.Tuple3[java.lang.Long, java.lang.Long, java.lang.Float]], scalaExercise, scalaSolution)
-    javaTuples(tuples.asInstanceOf[util.List[Tuple3[Long, Long, Float]]])
+    javaTuples(tuples.asInstanceOf[util.List[(Long, Long, Float)]])
   }
 
   private def javaTuples(a: util.List[(Long, Long, Float)]): util.ArrayList[tuple.Tuple3[java.lang.Long, java.lang.Long, java.lang.Float]] = {
diff --git a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
index 9bd68ae..ba3e6b6 100644
--- a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
+++ b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
@@ -32,14 +32,14 @@
 
 public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
 
-	static Testable javaExercise = () -> LongRidesExercise.main(new String[]{});
+	static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new String[]{});
 
-	private DateTime beginning = new DateTime(2000, 1, 1, 0, 0);
+	private static final DateTime BEGINNING = new DateTime(2000, 1, 1, 0, 0);
 
 	@Test
 	public void shortRide() throws Exception {
-		DateTime oneMinLater = beginning.plusMinutes(1);
-		TaxiRide rideStarted = startRide(1, beginning);
+		DateTime oneMinLater = BEGINNING.plusMinutes(1);
+		TaxiRide rideStarted = startRide(1, BEGINNING);
 		TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
 		Long markOneMinLater = oneMinLater.getMillis();
 
@@ -49,8 +49,8 @@
 
 	@Test
 	public void outOfOrder() throws Exception {
-		DateTime oneMinLater = beginning.plusMinutes(1);
-		TaxiRide rideStarted = startRide(1, beginning);
+		DateTime oneMinLater = BEGINNING.plusMinutes(1);
+		TaxiRide rideStarted = startRide(1, BEGINNING);
 		TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
 		Long markOneMinLater = oneMinLater.getMillis();
 
@@ -60,8 +60,8 @@
 
 	@Test
 	public void noStartShort() throws Exception {
-		DateTime oneMinLater = beginning.plusMinutes(1);
-		TaxiRide rideStarted = startRide(1, beginning);
+		DateTime oneMinLater = BEGINNING.plusMinutes(1);
+		TaxiRide rideStarted = startRide(1, BEGINNING);
 		TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
 		Long markOneMinLater = oneMinLater.getMillis();
 
@@ -71,8 +71,8 @@
 
 	@Test
 	public void noEnd() throws Exception {
-		TaxiRide rideStarted = startRide(1, beginning);
-		Long markThreeHoursLater = beginning.plusHours(3).getMillis();
+		TaxiRide rideStarted = startRide(1, BEGINNING);
+		Long markThreeHoursLater = BEGINNING.plusHours(3).getMillis();
 
 		TestRideSource source = new TestRideSource(rideStarted, markThreeHoursLater);
 		assertEquals(Collections.singletonList(rideStarted), results(source));
@@ -80,9 +80,9 @@
 
 	@Test
 	public void longRide() throws Exception {
-		TaxiRide rideStarted = startRide(1, beginning);
-		Long mark2HoursLater = beginning.plusMinutes(120).getMillis();
-		TaxiRide rideEnded3HoursLater = endRide(rideStarted, beginning.plusHours(3));
+		TaxiRide rideStarted = startRide(1, BEGINNING);
+		Long mark2HoursLater = BEGINNING.plusMinutes(120).getMillis();
+		TaxiRide rideEnded3HoursLater = endRide(rideStarted, BEGINNING.plusHours(3));
 
 		TestRideSource source = new TestRideSource(rideStarted, mark2HoursLater, rideEnded3HoursLater);
 		assertEquals(Collections.singletonList(rideStarted), results(source));
@@ -102,7 +102,7 @@
 
 	protected List<TaxiRide> results(TestRideSource source) throws Exception {
 		Testable javaSolution = () -> LongRidesSolution.main(new String[]{});
-		return runApp(source, new TestSink<>(), javaExercise, javaSolution);
+		return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution);
 	}
 
 }
diff --git a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
index cf3dca5..e16ea83 100644
--- a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
+++ b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
@@ -74,8 +74,7 @@
 
 	public static class NYCFilter implements FilterFunction<TaxiRide> {
 		@Override
-		public boolean filter(TaxiRide taxiRide) throws Exception {
-
+		public boolean filter(TaxiRide taxiRide) {
 			return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&
 					GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
 		}
diff --git a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
index b0e151f..e4554d4 100644
--- a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
+++ b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
@@ -32,7 +32,7 @@
 
 public class RideCleansingTest extends TaxiRideTestBase<TaxiRide> {
 
-	static Testable javaExercise = () -> RideCleansingExercise.main(new String[]{});
+	static final Testable JAVA_EXERCISE = () -> RideCleansingExercise.main(new String[]{});
 
 	@Test
 	public void testInNYC() throws Exception {
@@ -61,7 +61,7 @@
 
 	protected List<?> results(TestRideSource source) throws Exception {
 		Testable javaSolution = () -> RideCleansingSolution.main(new String[]{});
-		return runApp(source, new TestSink<>(), javaExercise, javaSolution);
+		return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution);
 	}
 
 }
diff --git a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
index 961aa40..0254540 100644
--- a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
+++ b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
@@ -114,7 +114,7 @@
 			TaxiFare fare = fareState.value();
 			if (fare != null) {
 				fareState.clear();
-				out.collect(new Tuple2(ride, fare));
+				out.collect(Tuple2.of(ride, fare));
 			} else {
 				rideState.update(ride);
 			}
@@ -125,7 +125,7 @@
 			TaxiRide ride = rideState.value();
 			if (ride != null) {
 				rideState.clear();
-				out.collect(new Tuple2(ride, fare));
+				out.collect(Tuple2.of(ride, fare));
 			} else {
 				fareState.update(fare);
 			}
diff --git a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
index e1d270a..c2b1fb1 100644
--- a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
+++ b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
@@ -35,7 +35,7 @@
 
 public class RidesAndFaresTest extends TaxiRideTestBase<Tuple2<TaxiRide, TaxiFare>> {
 
-	static Testable javaExercise = () -> RidesAndFaresExercise.main(new String[]{});
+	static final Testable JAVA_EXERCISE = () -> RidesAndFaresExercise.main(new String[]{});
 
 
 	final TaxiRide ride1 = testRide(1);
@@ -49,8 +49,8 @@
 		TestFareSource fares = new TestFareSource(fare1, fare2);
 
 		List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList(
-				new Tuple2<>(ride1, fare1),
-				new Tuple2<>(ride2, fare2));
+				Tuple2.of(ride1, fare1),
+				Tuple2.of(ride2, fare2));
 
 		assertThat("Join results don't match", results(rides, fares), containsInAnyOrder(expected.toArray()));
 	}
@@ -61,8 +61,8 @@
 		TestFareSource fares = new TestFareSource(fare2, fare1);
 
 		List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList(
-				new Tuple2<>(ride1, fare1),
-				new Tuple2<>(ride2, fare2));
+				Tuple2.of(ride1, fare1),
+				Tuple2.of(ride2, fare2));
 
 		assertThat("Join results don't match", results(rides, fares), containsInAnyOrder(expected.toArray()));
 	}
@@ -78,7 +78,7 @@
 
 	protected List<?> results(TestRideSource rides, TestFareSource fares) throws Exception {
 		Testable javaSolution = () -> RidesAndFaresSolution.main(new String[]{});
-		return runApp(rides, fares, new TestSink<>(), javaExercise, javaSolution);
+		return runApp(rides, fares, new TestSink<>(), JAVA_EXERCISE, javaSolution);
 	}
 
 }
diff --git a/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala b/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
index c162122..03f7e6a 100644
--- a/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
+++ b/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
@@ -33,7 +33,7 @@
   override protected def results(rides: TaxiRideTestBase.TestRideSource, fares: TaxiRideTestBase.TestFareSource): util.List[tuple.Tuple2[TaxiRide, TaxiFare]] = {
     val scalaSolution: TaxiRideTestBase.Testable = () => RidesAndFaresSolution.main(Array.empty[String])
     val tuples: util.List[_] = runApp(rides, fares, new TaxiRideTestBase.TestSink[tuple.Tuple2[TaxiRide, TaxiFare]], scalaExercise, scalaSolution)
-    javaTuples(tuples.asInstanceOf[util.List[Tuple2[TaxiRide, TaxiFare]]])
+    javaTuples(tuples.asInstanceOf[util.List[(TaxiRide, TaxiFare)]])
   }
 
   private def javaTuples(a: util.List[(TaxiRide, TaxiFare)]): util.ArrayList[tuple.Tuple2[TaxiRide, TaxiFare]] = {