blob: ba8582a6f29bd61b13dbcdc63b37838a3b59cf66 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.training.exercises.longrides;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.operators.KeyedProcessOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.training.exercises.common.datatypes.TaxiRide;
import org.apache.flink.training.exercises.testing.ComposedKeyedProcessFunction;
import org.apache.flink.training.solutions.longrides.LongRidesSolution;
import org.junit.Before;
import org.junit.Test;
import java.util.concurrent.ConcurrentLinkedQueue;
import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
// needed for the Scala tests to use scala.Long with this Java test
@SuppressWarnings({"rawtypes", "unchecked"})
public class LongRidesUnitTest extends LongRidesTestBase {
private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> harness;
private final KeyedProcessFunction<Long, TaxiRide, Long> javaExercise =
new LongRidesExercise.AlertFunction();
private final KeyedProcessFunction<Long, TaxiRide, Long> javaSolution =
new LongRidesSolution.AlertFunction();
protected ComposedKeyedProcessFunction composedAlertFunction() {
return new ComposedKeyedProcessFunction<>(javaExercise, javaSolution);
}
@Before
public void setupTestHarness() throws Exception {
this.harness = setupHarness(composedAlertFunction());
}
@Test
public void shouldUseTimersAndState() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
harness.processElement(rideStarted.asStreamRecord());
// check for state and timers
assertThat(harness.numEventTimeTimers()).isGreaterThan(0);
assertThat(harness.numKeyedStateEntries()).isGreaterThan(0);
TaxiRide endedOneMinuteLater = endRide(rideStarted, ONE_MINUTE_LATER);
harness.processElement(endedOneMinuteLater.asStreamRecord());
// in this case, state and timers should be gone now
assertThat(harness.numEventTimeTimers()).isZero();
assertThat(harness.numKeyedStateEntries()).isZero();
}
@Test
public void shouldNotAlertWithStartFirst() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
TaxiRide endedOneMinuteLater = endRide(rideStarted, ONE_MINUTE_LATER);
harness.processElement(rideStarted.asStreamRecord());
harness.processElement(endedOneMinuteLater.asStreamRecord());
assertThat(harness.getOutput()).isEmpty();
}
@Test
public void shouldNotAlertWithEndFirst() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
TaxiRide endedOneMinuteLater = endRide(rideStarted, ONE_MINUTE_LATER);
harness.processElement(endedOneMinuteLater.asStreamRecord());
harness.processElement(rideStarted.asStreamRecord());
assertThat(harness.getOutput()).isEmpty();
}
@Test
public void shouldAlertWithStartFirst() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
TaxiRide endedThreeHoursLater = endRide(rideStarted, THREE_HOURS_LATER);
harness.processElement(rideStarted.asStreamRecord());
harness.processElement(endedThreeHoursLater.asStreamRecord());
assertThat(resultingRideId()).isEqualTo(rideStarted.rideId);
}
@Test
public void shouldAlertWithEndFirst() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
TaxiRide endedThreeHoursLater = endRide(rideStarted, THREE_HOURS_LATER);
harness.processElement(endedThreeHoursLater.asStreamRecord());
harness.processElement(rideStarted.asStreamRecord());
assertThat(resultingRideId()).isEqualTo(rideStarted.rideId);
}
@Test
public void shouldNotAlertWithoutWatermarkOrEndEvent() throws Exception {
TaxiRide rideStarted = startRide(1, BEGINNING);
TaxiRide otherRideStartsThreeHoursLater = startRide(2, THREE_HOURS_LATER);
harness.processElement(rideStarted.asStreamRecord());
harness.processElement(otherRideStartsThreeHoursLater.asStreamRecord());
assertThat(harness.getOutput()).isEmpty();
Watermark mark2HoursLater =
new Watermark(BEGINNING.plusSeconds(2 * 60 * 60).toEpochMilli());
harness.processWatermark(mark2HoursLater);
StreamRecord<Long> rideIdAtTimeOfWatermark =
new StreamRecord<>(rideStarted.rideId, mark2HoursLater.getTimestamp());
assertThat(harness.getOutput()).containsExactly(rideIdAtTimeOfWatermark, mark2HoursLater);
}
@Test
public void shouldAlertOnWatermark() throws Exception {
TaxiRide startOfLongRide = startRide(1, BEGINNING);
harness.processElement(startOfLongRide.asStreamRecord());
// Can't be done properly without some managed keyed state
assertThat(harness.numKeyedStateEntries()).isGreaterThan(0);
// At this point there should be no output
ConcurrentLinkedQueue<Object> initialOutput = harness.getOutput();
assertThat(initialOutput).isEmpty();
Watermark mark2HoursLater =
new Watermark(BEGINNING.plusSeconds(2 * 60 * 60).toEpochMilli());
harness.processWatermark(mark2HoursLater);
// Check that the result is correct
StreamRecord<Long> rideIdAtTimeOfWatermark =
new StreamRecord<>(startOfLongRide.rideId, mark2HoursLater.getTimestamp());
assertThat(harness.getOutput()).containsExactly(rideIdAtTimeOfWatermark, mark2HoursLater);
}
private Long resultingRideId() {
ConcurrentLinkedQueue<Object> results = harness.getOutput();
assertThat(results.size())
.withFailMessage("Expecting test to have exactly one result")
.isEqualTo(1);
StreamRecord<Long> resultingRecord = (StreamRecord<Long>) results.element();
return resultingRecord.getValue();
}
private KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> setupHarness(
KeyedProcessFunction<Long, TaxiRide, Long> function) throws Exception {
KeyedProcessOperator<Long, TaxiRide, Long> operator = new KeyedProcessOperator<>(function);
KeyedOneInputStreamOperatorTestHarness<Long, TaxiRide, Long> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, r -> r.rideId, Types.LONG);
testHarness.setup();
testHarness.open();
return testHarness;
}
}