[FLINK-17277] address IntelliJ warnings
diff --git a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
index fe17f93..ac7424a 100644
--- a/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
+++ b/common/src/main/java/org/apache/flink/training/examples/ridecount/RideCountExample.java
@@ -67,8 +67,8 @@
// map each ride to a tuple of (driverId, 1)
DataStream<Tuple2<Long, Long>> tuples = rides.map(new MapFunction<TaxiRide, Tuple2<Long, Long>>() {
@Override
- public Tuple2<Long, Long> map(TaxiRide ride) throws Exception {
- return new Tuple2<Long, Long>(ride.driverId, 1L);
+ public Tuple2<Long, Long> map(TaxiRide ride) {
+ return Tuple2.of(ride.driverId, 1L);
}
});
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
index 393638e..f1da3e6 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiFare.java
@@ -34,7 +34,7 @@
*/
public class TaxiFare implements Serializable {
- private static transient DateTimeFormatter timeFormatter =
+ private static final DateTimeFormatter TIME_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
/**
@@ -69,17 +69,14 @@
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(rideId).append(",");
- sb.append(taxiId).append(",");
- sb.append(driverId).append(",");
- sb.append(startTime.toString(timeFormatter)).append(",");
- sb.append(paymentType).append(",");
- sb.append(tip).append(",");
- sb.append(tolls).append(",");
- sb.append(totalFare);
-
- return sb.toString();
+ return rideId + "," +
+ taxiId + "," +
+ driverId + "," +
+ startTime.toString(TIME_FORMATTER) + "," +
+ paymentType + "," +
+ tip + "," +
+ tolls + "," +
+ totalFare;
}
/**
@@ -98,7 +95,7 @@
ride.rideId = Long.parseLong(tokens[0]);
ride.taxiId = Long.parseLong(tokens[1]);
ride.driverId = Long.parseLong(tokens[2]);
- ride.startTime = DateTime.parse(tokens[3], timeFormatter);
+ ride.startTime = DateTime.parse(tokens[3], TIME_FORMATTER);
ride.paymentType = tokens[4];
ride.tip = tokens[5].length() > 0 ? Float.parseFloat(tokens[5]) : 0.0f;
ride.tolls = tokens[6].length() > 0 ? Float.parseFloat(tokens[6]) : 0.0f;
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
index 7c78dec..a923a00 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/datatypes/TaxiRide.java
@@ -24,6 +24,8 @@
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
+import javax.annotation.Nullable;
+
import java.io.Serializable;
import java.util.Locale;
@@ -45,7 +47,7 @@
*/
public class TaxiRide implements Comparable<TaxiRide>, Serializable {
- private static transient DateTimeFormatter timeFormatter =
+ private static final DateTimeFormatter TIME_FORMATTER =
DateTimeFormat.forPattern("yyyy-MM-dd HH:mm:ss").withLocale(Locale.US).withZoneUTC();
/**
@@ -89,20 +91,17 @@
@Override
public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(rideId).append(",");
- sb.append(isStart ? "START" : "END").append(",");
- sb.append(startTime.toString(timeFormatter)).append(",");
- sb.append(endTime.toString(timeFormatter)).append(",");
- sb.append(startLon).append(",");
- sb.append(startLat).append(",");
- sb.append(endLon).append(",");
- sb.append(endLat).append(",");
- sb.append(passengerCnt).append(",");
- sb.append(taxiId).append(",");
- sb.append(driverId);
-
- return sb.toString();
+ return rideId + "," +
+ (isStart ? "START" : "END") + "," +
+ startTime.toString(TIME_FORMATTER) + "," +
+ endTime.toString(TIME_FORMATTER) + "," +
+ startLon + "," +
+ startLat + "," +
+ endLon + "," +
+ endLat + "," +
+ passengerCnt + "," +
+ taxiId + "," +
+ driverId;
}
/**
@@ -123,13 +122,13 @@
switch (tokens[1]) {
case "START":
ride.isStart = true;
- ride.startTime = DateTime.parse(tokens[2], timeFormatter);
- ride.endTime = DateTime.parse(tokens[3], timeFormatter);
+ ride.startTime = DateTime.parse(tokens[2], TIME_FORMATTER);
+ ride.endTime = DateTime.parse(tokens[3], TIME_FORMATTER);
break;
case "END":
ride.isStart = false;
- ride.endTime = DateTime.parse(tokens[2], timeFormatter);
- ride.startTime = DateTime.parse(tokens[3], timeFormatter);
+ ride.endTime = DateTime.parse(tokens[2], TIME_FORMATTER);
+ ride.startTime = DateTime.parse(tokens[3], TIME_FORMATTER);
break;
default:
throw new RuntimeException("Invalid record: " + line);
@@ -158,7 +157,7 @@
* <li>putting START events before END events if they have the same timestamp</li>
* </ul>
*/
- public int compareTo(TaxiRide other) {
+ public int compareTo(@Nullable TaxiRide other) {
if (other == null) {
return 1;
}
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
index c0f3c88..a02eff6 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiFareSource.java
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.Calendar;
import java.util.Comparator;
import java.util.PriorityQueue;
@@ -103,7 +104,7 @@
}
this.dataFilePath = dataFilePath;
this.maxDelayMsecs = maxEventDelaySecs * 1000;
- this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : maxDelayMsecs;
+ this.watermarkDelayMSecs = Math.max(maxDelayMsecs, 10000);
this.servingSpeed = servingSpeedFactor;
}
@@ -111,7 +112,7 @@
public void run(SourceContext<TaxiFare> sourceContext) throws Exception {
gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
- reader = new BufferedReader(new InputStreamReader(gzipStream, "UTF-8"));
+ reader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8));
generateUnorderedStream(sourceContext);
@@ -130,12 +131,7 @@
Random rand = new Random(7452);
PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
32,
- new Comparator<Tuple2<Long, Object>>() {
- @Override
- public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {
- return o1.f0.compareTo(o2.f0);
- }
- });
+ Comparator.comparing(o -> o.f0));
// read first ride and insert it into emit schedule
String line;
@@ -148,11 +144,11 @@
// get delayed time
long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);
- emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, fare));
+ emitSchedule.add(Tuple2.of(delayedEventTime, fare));
// schedule next watermark
long watermarkTime = dataStartTime + watermarkDelayMSecs;
Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
- emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
+ emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
} else {
return;
@@ -176,7 +172,7 @@
) {
// insert event into emit schedule
long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
- emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, fare));
+ emitSchedule.add(Tuple2.of(delayedEventTime, fare));
// read next ride
if (reader.ready() && (line = reader.readLine()) != null) {
@@ -190,13 +186,14 @@
}
// emit schedule is updated, emit next element in schedule
- Tuple2<Long, Object> head = emitSchedule.poll();
+ Tuple2<Long, Object> head = emitSchedule.remove();
long delayedEventTime = head.f0;
long now = Calendar.getInstance().getTimeInMillis();
long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
long waitTime = servingTime - now;
+ //noinspection BusyWait
Thread.sleep((waitTime > 0) ? waitTime : 0);
if (head.f1 instanceof TaxiFare) {
@@ -211,7 +208,7 @@
// schedule next watermark
long watermarkTime = delayedEventTime + watermarkDelayMSecs;
Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
- emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
+ emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
}
}
}
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
index 6bfb970..b19a310 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/sources/TaxiRideSource.java
@@ -28,6 +28,7 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
import java.util.Calendar;
import java.util.Comparator;
import java.util.PriorityQueue;
@@ -103,7 +104,7 @@
}
this.dataFilePath = dataFilePath;
this.maxDelayMsecs = maxEventDelaySecs * 1000;
- this.watermarkDelayMSecs = maxDelayMsecs < 10000 ? 10000 : maxDelayMsecs;
+ this.watermarkDelayMSecs = Math.max(maxDelayMsecs, 10000);
this.servingSpeed = servingSpeedFactor;
}
@@ -111,7 +112,7 @@
public void run(SourceContext<TaxiRide> sourceContext) throws Exception {
gzipStream = new GZIPInputStream(new FileInputStream(dataFilePath));
- reader = new BufferedReader(new InputStreamReader(gzipStream, "UTF-8"));
+ reader = new BufferedReader(new InputStreamReader(gzipStream, StandardCharsets.UTF_8));
generateUnorderedStream(sourceContext);
@@ -130,12 +131,7 @@
Random rand = new Random(7452);
PriorityQueue<Tuple2<Long, Object>> emitSchedule = new PriorityQueue<>(
32,
- new Comparator<Tuple2<Long, Object>>() {
- @Override
- public int compare(Tuple2<Long, Object> o1, Tuple2<Long, Object> o2) {
- return o1.f0.compareTo(o2.f0);
- }
- });
+ Comparator.comparing(o -> o.f0));
// read first ride and insert it into emit schedule
String line;
@@ -148,11 +144,11 @@
// get delayed time
long delayedEventTime = dataStartTime + getNormalDelayMsecs(rand);
- emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
+ emitSchedule.add(Tuple2.of(delayedEventTime, ride));
// schedule next watermark
long watermarkTime = dataStartTime + watermarkDelayMSecs;
Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
- emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
+ emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
} else {
return;
@@ -176,7 +172,7 @@
) {
// insert event into emit schedule
long delayedEventTime = rideEventTime + getNormalDelayMsecs(rand);
- emitSchedule.add(new Tuple2<Long, Object>(delayedEventTime, ride));
+ emitSchedule.add(Tuple2.of(delayedEventTime, ride));
// read next ride
if (reader.ready() && (line = reader.readLine()) != null) {
@@ -190,13 +186,14 @@
}
// emit schedule is updated, emit next element in schedule
- Tuple2<Long, Object> head = emitSchedule.poll();
+ Tuple2<Long, Object> head = emitSchedule.remove();
long delayedEventTime = head.f0;
long now = Calendar.getInstance().getTimeInMillis();
long servingTime = toServingTime(servingStartTime, dataStartTime, delayedEventTime);
long waitTime = servingTime - now;
+ //noinspection BusyWait
Thread.sleep((waitTime > 0) ? waitTime : 0);
if (head.f1 instanceof TaxiRide) {
@@ -211,7 +208,7 @@
// schedule next watermark
long watermarkTime = delayedEventTime + watermarkDelayMSecs;
Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
- emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
+ emitSchedule.add(Tuple2.of(watermarkTime, nextWatermark));
}
}
}
diff --git a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
index 4594248..e638ab3 100644
--- a/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
+++ b/common/src/main/java/org/apache/flink/training/exercises/common/utils/GeoUtils.java
@@ -245,9 +245,8 @@
double x = destLat - startLat;
double y = (destLon - startLon) * Math.cos(startLat);
- int degrees = (int) Math.toDegrees(Math.atan2(x, y)) + 179;
- return degrees;
+ return (int) Math.toDegrees(Math.atan2(x, y)) + 179;
}
}
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
index 6c59de1..f4f5596 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/TaxiRideTestBase.java
@@ -30,34 +30,49 @@
import java.util.List;
public abstract class TaxiRideTestBase<OUT> {
- public static class TestRideSource extends TestSource implements ResultTypeQueryable<TaxiRide> {
+ public static class TestRideSource extends TestSource<TaxiRide> implements ResultTypeQueryable<TaxiRide> {
public TestRideSource(Object ... eventsOrWatermarks) {
this.testStream = eventsOrWatermarks;
}
@Override
+ long getTimestamp(TaxiRide ride) {
+ return ride.getEventTime();
+ }
+
+ @Override
public TypeInformation<TaxiRide> getProducedType() {
return TypeInformation.of(TaxiRide.class);
}
}
- public static class TestFareSource extends TestSource implements ResultTypeQueryable<TaxiFare> {
+ public static class TestFareSource extends TestSource<TaxiFare> implements ResultTypeQueryable<TaxiFare> {
public TestFareSource(Object ... eventsOrWatermarks) {
this.testStream = eventsOrWatermarks;
}
@Override
+ long getTimestamp(TaxiFare fare) {
+ return fare.getEventTime();
+ }
+
+ @Override
public TypeInformation<TaxiFare> getProducedType() {
return TypeInformation.of(TaxiFare.class);
}
}
- public static class TestStringSource extends TestSource implements ResultTypeQueryable<String> {
+ public static class TestStringSource extends TestSource<String> implements ResultTypeQueryable<String> {
public TestStringSource(Object ... eventsOrWatermarks) {
this.testStream = eventsOrWatermarks;
}
@Override
+ long getTimestamp(String s) {
+ return 0L;
+ }
+
+ @Override
public TypeInformation<String> getProducedType() {
return TypeInformation.of(String.class);
}
@@ -69,13 +84,13 @@
public static final List VALUES = new ArrayList<>();
@Override
- public void invoke(OUT value, Context context) throws Exception {
+ public void invoke(OUT value, Context context) {
VALUES.add(value);
}
}
public interface Testable {
- public abstract void main() throws Exception;
+ void main() throws Exception;
}
protected List<OUT> runApp(TestRideSource source, TestSink<OUT> sink, Testable exercise, Testable solution) throws Exception {
diff --git a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
index 57d72ab..84ba53e 100644
--- a/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
+++ b/common/src/test/java/org/apache/flink/training/exercises/testing/TestSource.java
@@ -20,35 +20,29 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.training.exercises.common.datatypes.TaxiFare;
-import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
-public abstract class TestSource implements SourceFunction {
+public abstract class TestSource<T> implements SourceFunction<T> {
private volatile boolean running = true;
+ // T or watermark (Long)
protected Object[] testStream;
@Override
- public void run(SourceContext ctx) throws Exception {
+ public void run(SourceContext<T> ctx) {
for (int i = 0; (i < testStream.length) && running; i++) {
- if (testStream[i] instanceof TaxiRide) {
- TaxiRide ride = (TaxiRide) testStream[i];
- ctx.collectWithTimestamp(ride, ride.getEventTime());
- } else if (testStream[i] instanceof TaxiFare) {
- TaxiFare fare = (TaxiFare) testStream[i];
- ctx.collectWithTimestamp(fare, fare.getEventTime());
- } else if (testStream[i] instanceof String) {
- String s = (String) testStream[i];
- ctx.collectWithTimestamp(s, 0);
- } else if (testStream[i] instanceof Long) {
+ if (testStream[i] instanceof Long) {
Long ts = (Long) testStream[i];
ctx.emitWatermark(new Watermark(ts));
} else {
- throw new RuntimeException(testStream[i].toString());
+ //noinspection unchecked
+ T element = (T) testStream[i];
+ ctx.collectWithTimestamp(element, getTimestamp(element));
}
}
// test sources are finite, so they have a Long.MAX_VALUE watermark when they finishes
}
+ abstract long getTimestamp(T element);
+
@Override
public void cancel() {
running = false;
diff --git a/config/checkstyle/checkstyle.xml b/config/checkstyle/checkstyle.xml
index 34b0e76..e656fda 100644
--- a/config/checkstyle/checkstyle.xml
+++ b/config/checkstyle/checkstyle.xml
@@ -18,8 +18,8 @@
under the License.
-->
<!DOCTYPE module PUBLIC
- "-//Puppy Crawl//DTD Check Configuration 1.3//EN"
- "http://www.puppycrawl.com/dtds/configuration_1_3.dtd">
+ "-//Checkstyle//DTD Checkstyle Configuration 1.3//EN"
+ "https://checkstyle.org/dtds/configuration_1_3.dtd">
<!--
This is a checkstyle configuration file. For descriptions of
diff --git a/config/checkstyle/suppressions.xml b/config/checkstyle/suppressions.xml
index 0b05a64..01e36b5 100644
--- a/config/checkstyle/suppressions.xml
+++ b/config/checkstyle/suppressions.xml
@@ -19,8 +19,8 @@
-->
<!DOCTYPE suppressions PUBLIC
- "-//Puppy Crawl//DTD Suppressions 1.1//EN"
- "http://www.puppycrawl.com/dtds/suppressions_1_1.dtd">
+ "-//Checkstyle//DTD SuppressionFilter Configuration 1.2//EN"
+ "https://checkstyle.org/dtds/suppressions_1_2.dtd">
<suppressions>
<suppress
diff --git a/hourly-tips/DISCUSSION.md b/hourly-tips/DISCUSSION.md
index 2e91b2d..ab653d3 100644
--- a/hourly-tips/DISCUSSION.md
+++ b/hourly-tips/DISCUSSION.md
@@ -41,7 +41,7 @@
for (TaxiFare f : fares) {
sumOfTips += f.tip;
}
- out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
+ out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}
}
```
diff --git a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
index 4e2bb32..b640a19 100644
--- a/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
+++ b/hourly-tips/src/solution/java/org/apache/flink/training/solutions/hourlytips/HourlyTipsSolution.java
@@ -96,12 +96,12 @@
public static class AddTips extends ProcessWindowFunction<
TaxiFare, Tuple3<Long, Long, Float>, Long, TimeWindow> {
@Override
- public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) throws Exception {
- Float sumOfTips = 0F;
+ public void process(Long key, Context context, Iterable<TaxiFare> fares, Collector<Tuple3<Long, Long, Float>> out) {
+ float sumOfTips = 0F;
for (TaxiFare f : fares) {
sumOfTips += f.tip;
}
- out.collect(new Tuple3<>(context.window().getEnd(), key, sumOfTips));
+ out.collect(Tuple3.of(context.window().getEnd(), key, sumOfTips));
}
}
}
diff --git a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
index 3ad6243..ab3de6c 100644
--- a/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
+++ b/hourly-tips/src/solution/scala/org/apache/flink/training/solutions/hourlytips/scala/HourlyTipsSolution.scala
@@ -82,7 +82,7 @@
class WrapWithWindowInfo() extends ProcessWindowFunction[(Long, Float), (Long, Long, Float), Long, TimeWindow] {
override def process(key: Long, context: Context, elements: Iterable[(Long, Float)], out: Collector[(Long, Long, Float)]): Unit = {
val sumOfTips = elements.iterator.next()._2
- out.collect((context.window.getEnd(), key, sumOfTips))
+ out.collect((context.window.getEnd, key, sumOfTips))
}
}
diff --git a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
index 565a8d3..eef1bdf 100644
--- a/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
+++ b/hourly-tips/src/test/java/org/apache/flink/training/exercises/hourlytips/HourlyTipsTest.java
@@ -35,7 +35,7 @@
public class HourlyTipsTest extends TaxiRideTestBase<Tuple3<Long, Long, Float>> {
- static Testable javaExercise = () -> HourlyTipsExercise.main(new String[]{});
+ static final Testable JAVA_EXERCISE = () -> HourlyTipsExercise.main(new String[]{});
@Test
public void testOneDriverOneTip() throws Exception {
@@ -45,7 +45,7 @@
one
);
- Tuple3<Long, Long, Float> max = new Tuple3<Long, Long, Float>(t(60), 1L, 1.0F);
+ Tuple3<Long, Long, Float> max = Tuple3.of(t(60), 1L, 1.0F);
List<Tuple3<Long, Long, Float>> expected = Collections.singletonList(max);
@@ -64,8 +64,8 @@
tenIn2
);
- Tuple3<Long, Long, Float> hour1 = new Tuple3<Long, Long, Float>(t(60), 1L, 6.0F);
- Tuple3<Long, Long, Float> hour2 = new Tuple3<Long, Long, Float>(t(120), 1L, 10.0F);
+ Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60), 1L, 6.0F);
+ Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120), 1L, 10.0F);
List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
@@ -86,8 +86,8 @@
twentyFor2In2
);
- Tuple3<Long, Long, Float> hour1 = new Tuple3<Long, Long, Float>(t(60), 1L, 6.0F);
- Tuple3<Long, Long, Float> hour2 = new Tuple3<Long, Long, Float>(t(120), 2L, 20.0F);
+ Tuple3<Long, Long, Float> hour1 = Tuple3.of(t(60), 1L, 6.0F);
+ Tuple3<Long, Long, Float> hour2 = Tuple3.of(t(120), 2L, 20.0F);
List<Tuple3<Long, Long, Float>> expected = Arrays.asList(hour1, hour2);
@@ -104,7 +104,7 @@
protected List<Tuple3<Long, Long, Float>> results(TestFareSource source) throws Exception {
Testable javaSolution = () -> HourlyTipsSolution.main(new String[]{});
- return runApp(source, new TestSink<>(), javaExercise, javaSolution);
+ return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution);
}
}
diff --git a/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala b/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
index fbcedf8..aacaab5 100644
--- a/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
+++ b/hourly-tips/src/test/scala/org/apache/flink/training/exercises/hourlytips/scala/HourlyTipsTest.scala
@@ -32,7 +32,7 @@
override protected def results(source: TaxiRideTestBase.TestFareSource): util.List[tuple.Tuple3[java.lang.Long, java.lang.Long, java.lang.Float]] = {
val scalaSolution: TaxiRideTestBase.Testable = () => HourlyTipsSolution.main(Array.empty[String])
val tuples: util.List[_] = runApp(source, new TaxiRideTestBase.TestSink[tuple.Tuple3[java.lang.Long, java.lang.Long, java.lang.Float]], scalaExercise, scalaSolution)
- javaTuples(tuples.asInstanceOf[util.List[Tuple3[Long, Long, Float]]])
+ javaTuples(tuples.asInstanceOf[util.List[(Long, Long, Float)]])
}
private def javaTuples(a: util.List[(Long, Long, Float)]): util.ArrayList[tuple.Tuple3[java.lang.Long, java.lang.Long, java.lang.Float]] = {
diff --git a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
index 9bd68ae..ba3e6b6 100644
--- a/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
+++ b/long-ride-alerts/src/test/java/org/apache/flink/training/exercises/longrides/LongRidesTest.java
@@ -32,14 +32,14 @@
public class LongRidesTest extends TaxiRideTestBase<TaxiRide> {
- static Testable javaExercise = () -> LongRidesExercise.main(new String[]{});
+ static final Testable JAVA_EXERCISE = () -> LongRidesExercise.main(new String[]{});
- private DateTime beginning = new DateTime(2000, 1, 1, 0, 0);
+ private static final DateTime BEGINNING = new DateTime(2000, 1, 1, 0, 0);
@Test
public void shortRide() throws Exception {
- DateTime oneMinLater = beginning.plusMinutes(1);
- TaxiRide rideStarted = startRide(1, beginning);
+ DateTime oneMinLater = BEGINNING.plusMinutes(1);
+ TaxiRide rideStarted = startRide(1, BEGINNING);
TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
Long markOneMinLater = oneMinLater.getMillis();
@@ -49,8 +49,8 @@
@Test
public void outOfOrder() throws Exception {
- DateTime oneMinLater = beginning.plusMinutes(1);
- TaxiRide rideStarted = startRide(1, beginning);
+ DateTime oneMinLater = BEGINNING.plusMinutes(1);
+ TaxiRide rideStarted = startRide(1, BEGINNING);
TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
Long markOneMinLater = oneMinLater.getMillis();
@@ -60,8 +60,8 @@
@Test
public void noStartShort() throws Exception {
- DateTime oneMinLater = beginning.plusMinutes(1);
- TaxiRide rideStarted = startRide(1, beginning);
+ DateTime oneMinLater = BEGINNING.plusMinutes(1);
+ TaxiRide rideStarted = startRide(1, BEGINNING);
TaxiRide endedOneMinLater = endRide(rideStarted, oneMinLater);
Long markOneMinLater = oneMinLater.getMillis();
@@ -71,8 +71,8 @@
@Test
public void noEnd() throws Exception {
- TaxiRide rideStarted = startRide(1, beginning);
- Long markThreeHoursLater = beginning.plusHours(3).getMillis();
+ TaxiRide rideStarted = startRide(1, BEGINNING);
+ Long markThreeHoursLater = BEGINNING.plusHours(3).getMillis();
TestRideSource source = new TestRideSource(rideStarted, markThreeHoursLater);
assertEquals(Collections.singletonList(rideStarted), results(source));
@@ -80,9 +80,9 @@
@Test
public void longRide() throws Exception {
- TaxiRide rideStarted = startRide(1, beginning);
- Long mark2HoursLater = beginning.plusMinutes(120).getMillis();
- TaxiRide rideEnded3HoursLater = endRide(rideStarted, beginning.plusHours(3));
+ TaxiRide rideStarted = startRide(1, BEGINNING);
+ Long mark2HoursLater = BEGINNING.plusMinutes(120).getMillis();
+ TaxiRide rideEnded3HoursLater = endRide(rideStarted, BEGINNING.plusHours(3));
TestRideSource source = new TestRideSource(rideStarted, mark2HoursLater, rideEnded3HoursLater);
assertEquals(Collections.singletonList(rideStarted), results(source));
@@ -102,7 +102,7 @@
protected List<TaxiRide> results(TestRideSource source) throws Exception {
Testable javaSolution = () -> LongRidesSolution.main(new String[]{});
- return runApp(source, new TestSink<>(), javaExercise, javaSolution);
+ return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution);
}
}
diff --git a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
index cf3dca5..e16ea83 100644
--- a/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
+++ b/ride-cleansing/src/solution/java/org/apache/flink/training/solutions/ridecleansing/RideCleansingSolution.java
@@ -74,8 +74,7 @@
public static class NYCFilter implements FilterFunction<TaxiRide> {
@Override
- public boolean filter(TaxiRide taxiRide) throws Exception {
-
+ public boolean filter(TaxiRide taxiRide) {
return GeoUtils.isInNYC(taxiRide.startLon, taxiRide.startLat) &&
GeoUtils.isInNYC(taxiRide.endLon, taxiRide.endLat);
}
diff --git a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
index b0e151f..e4554d4 100644
--- a/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
+++ b/ride-cleansing/src/test/java/org/apache/flink/training/exercises/ridecleansing/RideCleansingTest.java
@@ -32,7 +32,7 @@
public class RideCleansingTest extends TaxiRideTestBase<TaxiRide> {
- static Testable javaExercise = () -> RideCleansingExercise.main(new String[]{});
+ static final Testable JAVA_EXERCISE = () -> RideCleansingExercise.main(new String[]{});
@Test
public void testInNYC() throws Exception {
@@ -61,7 +61,7 @@
protected List<?> results(TestRideSource source) throws Exception {
Testable javaSolution = () -> RideCleansingSolution.main(new String[]{});
- return runApp(source, new TestSink<>(), javaExercise, javaSolution);
+ return runApp(source, new TestSink<>(), JAVA_EXERCISE, javaSolution);
}
}
diff --git a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
index 961aa40..0254540 100644
--- a/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
+++ b/rides-and-fares/src/solution/java/org/apache/flink/training/solutions/ridesandfares/RidesAndFaresSolution.java
@@ -114,7 +114,7 @@
TaxiFare fare = fareState.value();
if (fare != null) {
fareState.clear();
- out.collect(new Tuple2(ride, fare));
+ out.collect(Tuple2.of(ride, fare));
} else {
rideState.update(ride);
}
@@ -125,7 +125,7 @@
TaxiRide ride = rideState.value();
if (ride != null) {
rideState.clear();
- out.collect(new Tuple2(ride, fare));
+ out.collect(Tuple2.of(ride, fare));
} else {
fareState.update(fare);
}
diff --git a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
index e1d270a..c2b1fb1 100644
--- a/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
+++ b/rides-and-fares/src/test/java/org/apache/flink/training/exercises/ridesandfares/RidesAndFaresTest.java
@@ -35,7 +35,7 @@
public class RidesAndFaresTest extends TaxiRideTestBase<Tuple2<TaxiRide, TaxiFare>> {
- static Testable javaExercise = () -> RidesAndFaresExercise.main(new String[]{});
+ static final Testable JAVA_EXERCISE = () -> RidesAndFaresExercise.main(new String[]{});
final TaxiRide ride1 = testRide(1);
@@ -49,8 +49,8 @@
TestFareSource fares = new TestFareSource(fare1, fare2);
List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList(
- new Tuple2<>(ride1, fare1),
- new Tuple2<>(ride2, fare2));
+ Tuple2.of(ride1, fare1),
+ Tuple2.of(ride2, fare2));
assertThat("Join results don't match", results(rides, fares), containsInAnyOrder(expected.toArray()));
}
@@ -61,8 +61,8 @@
TestFareSource fares = new TestFareSource(fare2, fare1);
List<Tuple2<TaxiRide, TaxiFare>> expected = Arrays.asList(
- new Tuple2<>(ride1, fare1),
- new Tuple2<>(ride2, fare2));
+ Tuple2.of(ride1, fare1),
+ Tuple2.of(ride2, fare2));
assertThat("Join results don't match", results(rides, fares), containsInAnyOrder(expected.toArray()));
}
@@ -78,7 +78,7 @@
protected List<?> results(TestRideSource rides, TestFareSource fares) throws Exception {
Testable javaSolution = () -> RidesAndFaresSolution.main(new String[]{});
- return runApp(rides, fares, new TestSink<>(), javaExercise, javaSolution);
+ return runApp(rides, fares, new TestSink<>(), JAVA_EXERCISE, javaSolution);
}
}
diff --git a/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala b/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
index c162122..03f7e6a 100644
--- a/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
+++ b/rides-and-fares/src/test/scala/org/apache/flink/training/exercises/ridesandfares/scala/RidesAndFaresTest.scala
@@ -33,7 +33,7 @@
override protected def results(rides: TaxiRideTestBase.TestRideSource, fares: TaxiRideTestBase.TestFareSource): util.List[tuple.Tuple2[TaxiRide, TaxiFare]] = {
val scalaSolution: TaxiRideTestBase.Testable = () => RidesAndFaresSolution.main(Array.empty[String])
val tuples: util.List[_] = runApp(rides, fares, new TaxiRideTestBase.TestSink[tuple.Tuple2[TaxiRide, TaxiFare]], scalaExercise, scalaSolution)
- javaTuples(tuples.asInstanceOf[util.List[Tuple2[TaxiRide, TaxiFare]]])
+ javaTuples(tuples.asInstanceOf[util.List[(TaxiRide, TaxiFare)]])
}
private def javaTuples(a: util.List[(TaxiRide, TaxiFare)]): util.ArrayList[tuple.Tuple2[TaxiRide, TaxiFare]] = {