| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.beam.examples.complete.game; |
| |
| import static org.hamcrest.Matchers.hasItem; |
| import static org.junit.Assert.assertThat; |
| |
| import com.google.common.collect.ImmutableMap; |
| import java.io.Serializable; |
| import org.apache.beam.examples.complete.game.LeaderBoard.CalculateTeamScores; |
| import org.apache.beam.examples.complete.game.LeaderBoard.CalculateUserScores; |
| import org.apache.beam.examples.complete.game.UserScore.GameActionInfo; |
| import org.apache.beam.sdk.coders.AvroCoder; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.testing.PAssert; |
| import org.apache.beam.sdk.testing.TestPipeline; |
| import org.apache.beam.sdk.testing.TestStream; |
| import org.apache.beam.sdk.transforms.PTransform; |
| import org.apache.beam.sdk.transforms.SerializableFunction; |
| import org.apache.beam.sdk.transforms.windowing.BoundedWindow; |
| import org.apache.beam.sdk.transforms.windowing.GlobalWindow; |
| import org.apache.beam.sdk.transforms.windowing.IntervalWindow; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.PCollection; |
| import org.apache.beam.sdk.values.TimestampedValue; |
| import org.joda.time.Duration; |
| import org.joda.time.Instant; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.runner.RunWith; |
| import org.junit.runners.JUnit4; |
| |
| /** |
| * Tests for {@link LeaderBoard}. |
| */ |
| @RunWith(JUnit4.class) |
| public class LeaderBoardTest implements Serializable { |
| private static final Duration ALLOWED_LATENESS = Duration.standardHours(1); |
| private static final Duration TEAM_WINDOW_DURATION = Duration.standardMinutes(20); |
| private Instant baseTime = new Instant(0); |
| |
| @Rule |
| public TestPipeline p = TestPipeline.create(); |
| /** |
| * Some example users, on two separate teams. |
| */ |
| private enum TestUser { |
| RED_ONE("scarlet", "red"), RED_TWO("burgundy", "red"), |
| BLUE_ONE("navy", "blue"), BLUE_TWO("sky", "blue"); |
| |
| private final String userName; |
| private final String teamName; |
| |
| TestUser(String userName, String teamName) { |
| this.userName = userName; |
| this.teamName = teamName; |
| } |
| |
| public String getUser() { |
| return userName; |
| } |
| |
| public String getTeam() { |
| return teamName; |
| } |
| } |
| |
| /** |
| * A test of the {@link CalculateTeamScores} {@link PTransform} when all of the elements arrive |
| * on time (ahead of the watermark). |
| */ |
| @Test |
| public void testTeamScoresOnTime() { |
| |
| TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) |
| // Start at the epoch |
| .advanceWatermarkTo(baseTime) |
| // add some elements ahead of the watermark |
| .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)), |
| event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1)), |
| event(TestUser.RED_TWO, 3, Duration.standardSeconds(22)), |
| event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(3))) |
| // The watermark advances slightly, but not past the end of the window |
| .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3))) |
| // Add some more on time elements |
| .addElements(event(TestUser.RED_ONE, 1, Duration.standardMinutes(4)), |
| event(TestUser.BLUE_ONE, 2, Duration.standardSeconds(270))) |
| // The window should close and emit an ON_TIME pane |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<KV<String, Integer>> teamScores = p.apply(createEvents) |
| .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); |
| |
| String blueTeam = TestUser.BLUE_ONE.getTeam(); |
| String redTeam = TestUser.RED_ONE.getTeam(); |
| PAssert.that(teamScores) |
| .inOnTimePane(new IntervalWindow(baseTime, TEAM_WINDOW_DURATION)) |
| .containsInAnyOrder(KV.of(blueTeam, 12), KV.of(redTeam, 4)); |
| |
| p.run().waitUntilFinish(); |
| } |
| |
| /** |
| * A test of the {@link CalculateTeamScores} {@link PTransform} when all of the elements arrive |
| * on time, and the processing time advances far enough for speculative panes. |
| */ |
| @Test |
| public void testTeamScoresSpeculative() { |
| |
| TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) |
| // Start at the epoch |
| .advanceWatermarkTo(baseTime) |
| .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)), |
| event(TestUser.BLUE_ONE, 2, Duration.standardMinutes(1))) |
| // Some time passes within the runner, which causes a speculative pane containing the blue |
| // team's score to be emitted |
| .advanceProcessingTime(Duration.standardMinutes(10)) |
| .addElements(event(TestUser.RED_TWO, 5, Duration.standardMinutes(3))) |
| // Some additional time passes and we get a speculative pane for the red team |
| .advanceProcessingTime(Duration.standardMinutes(12)) |
| .addElements(event(TestUser.BLUE_TWO, 3, Duration.standardSeconds(22))) |
| // More time passes and a speculative pane containing a refined value for the blue pane is |
| // emitted |
| .advanceProcessingTime(Duration.standardMinutes(10)) |
| // Some more events occur |
| .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(4)), |
| event(TestUser.BLUE_TWO, 2, Duration.standardMinutes(2))) |
| // The window closes and we get an ON_TIME pane that contains all of the updates |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<KV<String, Integer>> teamScores = p.apply(createEvents) |
| .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); |
| |
| String blueTeam = TestUser.BLUE_ONE.getTeam(); |
| String redTeam = TestUser.RED_ONE.getTeam(); |
| IntervalWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); |
| // The window contains speculative panes alongside the on-time pane |
| PAssert.that(teamScores) |
| .inWindow(window) |
| .containsInAnyOrder(KV.of(blueTeam, 10) /* The on-time blue pane */, |
| KV.of(redTeam, 9) /* The on-time red pane */, |
| KV.of(blueTeam, 5) /* The first blue speculative pane */, |
| KV.of(blueTeam, 8) /* The second blue speculative pane */, |
| KV.of(redTeam, 5) /* The red speculative pane */); |
| PAssert.that(teamScores) |
| .inOnTimePane(window) |
| .containsInAnyOrder(KV.of(blueTeam, 10), KV.of(redTeam, 9)); |
| |
| p.run().waitUntilFinish(); |
| } |
| |
| /** |
| * A test where elements arrive behind the watermark (late data), but before the end of the |
| * window. These elements are emitted on time. |
| */ |
| @Test |
| public void testTeamScoresUnobservablyLate() { |
| |
| BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); |
| TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) |
| .advanceWatermarkTo(baseTime) |
| .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)), |
| event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(8)), |
| event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), |
| event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(5))) |
| .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION).minus(Duration.standardMinutes(1))) |
| // These events are late, but the window hasn't closed yet, so the elements are in the |
| // on-time pane |
| .addElements(event(TestUser.RED_TWO, 2, Duration.ZERO), |
| event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)), |
| event(TestUser.BLUE_TWO, 2, Duration.standardSeconds(90)), |
| event(TestUser.RED_TWO, 3, Duration.standardMinutes(3))) |
| .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1))) |
| .advanceWatermarkToInfinity(); |
| PCollection<KV<String, Integer>> teamScores = p.apply(createEvents) |
| .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); |
| |
| String blueTeam = TestUser.BLUE_ONE.getTeam(); |
| String redTeam = TestUser.RED_ONE.getTeam(); |
| // The On Time pane contains the late elements that arrived before the end of the window |
| PAssert.that(teamScores) |
| .inOnTimePane(window) |
| .containsInAnyOrder(KV.of(redTeam, 14), KV.of(blueTeam, 13)); |
| |
| p.run().waitUntilFinish(); |
| } |
| |
| /** |
| * A test where elements arrive behind the watermark (late data) after the watermark passes the |
| * end of the window, but before the maximum allowed lateness. These elements are emitted in a |
| * late pane. |
| */ |
| @Test |
| public void testTeamScoresObservablyLate() { |
| |
| Instant firstWindowCloses = baseTime.plus(ALLOWED_LATENESS).plus(TEAM_WINDOW_DURATION); |
| TestStream<GameActionInfo> createEvents = TestStream.create(AvroCoder.of(GameActionInfo.class)) |
| .advanceWatermarkTo(baseTime) |
| .addElements(event(TestUser.BLUE_ONE, 3, Duration.standardSeconds(3)), |
| event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(8))) |
| .advanceProcessingTime(Duration.standardMinutes(10)) |
| .advanceWatermarkTo(baseTime.plus(Duration.standardMinutes(3))) |
| .addElements(event(TestUser.RED_ONE, 3, Duration.standardMinutes(1)), |
| event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), |
| event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(5))) |
| .advanceWatermarkTo(firstWindowCloses.minus(Duration.standardMinutes(1))) |
| // These events are late but should still appear in a late pane |
| .addElements(event(TestUser.RED_TWO, 2, Duration.ZERO), |
| event(TestUser.RED_TWO, 5, Duration.standardMinutes(1)), |
| event(TestUser.RED_TWO, 3, Duration.standardMinutes(3))) |
| // A late refinement is emitted due to the advance in processing time, but the window has |
| // not yet closed because the watermark has not advanced |
| .advanceProcessingTime(Duration.standardMinutes(12)) |
| // These elements should appear in the final pane |
| .addElements(event(TestUser.RED_TWO, 9, Duration.standardMinutes(1)), |
| event(TestUser.RED_TWO, 1, Duration.standardMinutes(3))) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<KV<String, Integer>> teamScores = p.apply(createEvents) |
| .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); |
| |
| BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); |
| String blueTeam = TestUser.BLUE_ONE.getTeam(); |
| String redTeam = TestUser.RED_ONE.getTeam(); |
| PAssert.that(teamScores) |
| .inWindow(window) |
| .satisfies((SerializableFunction<Iterable<KV<String, Integer>>, Void>) input -> { |
| // The final sums need not exist in the same pane, but must appear in the output |
| // PCollection |
| assertThat(input, hasItem(KV.of(blueTeam, 11))); |
| assertThat(input, hasItem(KV.of(redTeam, 27))); |
| return null; |
| }); |
| PAssert.thatMap(teamScores) |
| // The closing behavior of CalculateTeamScores precludes an inFinalPane matcher |
| .inOnTimePane(window) |
| .isEqualTo(ImmutableMap.<String, Integer>builder().put(redTeam, 7) |
| .put(blueTeam, 11) |
| .build()); |
| |
| // No final pane is emitted for the blue team, as all of their updates have been taken into |
| // account in earlier panes |
| PAssert.that(teamScores).inFinalPane(window).containsInAnyOrder(KV.of(redTeam, 27)); |
| |
| p.run().waitUntilFinish(); |
| } |
| |
| /** |
| * A test where elements arrive beyond the maximum allowed lateness. These elements are dropped |
| * within {@link CalculateTeamScores} and do not impact the final result. |
| */ |
| @Test |
| public void testTeamScoresDroppablyLate() { |
| |
| BoundedWindow window = new IntervalWindow(baseTime, TEAM_WINDOW_DURATION); |
| TestStream<GameActionInfo> infos = TestStream.create(AvroCoder.of(GameActionInfo.class)) |
| .addElements(event(TestUser.BLUE_ONE, 12, Duration.ZERO), |
| event(TestUser.RED_ONE, 3, Duration.ZERO)) |
| .advanceWatermarkTo(window.maxTimestamp()) |
| .addElements(event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), |
| event(TestUser.BLUE_TWO, 3, Duration.ZERO), |
| event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3))) |
| // Move the watermark to the end of the window to output on time |
| .advanceWatermarkTo(baseTime.plus(TEAM_WINDOW_DURATION)) |
| // Move the watermark past the end of the allowed lateness plus the end of the window |
| .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS) |
| .plus(TEAM_WINDOW_DURATION).plus(Duration.standardMinutes(1))) |
| // These elements within the expired window are droppably late, and will not appear in the |
| // output |
| .addElements( |
| event(TestUser.BLUE_TWO, 3, TEAM_WINDOW_DURATION.minus(Duration.standardSeconds(5))), |
| event(TestUser.RED_ONE, 7, Duration.standardMinutes(4))) |
| .advanceWatermarkToInfinity(); |
| PCollection<KV<String, Integer>> teamScores = p.apply(infos) |
| .apply(new CalculateTeamScores(TEAM_WINDOW_DURATION, ALLOWED_LATENESS)); |
| |
| String blueTeam = TestUser.BLUE_ONE.getTeam(); |
| String redTeam = TestUser.RED_ONE.getTeam(); |
| // Only one on-time pane and no late panes should be emitted |
| PAssert.that(teamScores) |
| .inWindow(window) |
| .containsInAnyOrder(KV.of(redTeam, 7), KV.of(blueTeam, 18)); |
| // No elements are added before the watermark passes the end of the window plus the allowed |
| // lateness, so no refinement should be emitted |
| PAssert.that(teamScores).inFinalPane(window).empty(); |
| |
| p.run().waitUntilFinish(); |
| } |
| |
| /** |
| * A test where elements arrive both on-time and late in {@link CalculateUserScores}, which emits |
| * output into the {@link GlobalWindow}. All elements that arrive should be taken into account, |
| * even if they arrive later than the maximum allowed lateness. |
| */ |
| @Test |
| public void testUserScore() { |
| |
| TestStream<GameActionInfo> infos = |
| TestStream.create(AvroCoder.of(GameActionInfo.class)) |
| .addElements( |
| event(TestUser.BLUE_ONE, 12, Duration.ZERO), |
| event(TestUser.RED_ONE, 3, Duration.ZERO)) |
| .advanceProcessingTime(Duration.standardMinutes(7)) |
| .addElements( |
| event(TestUser.RED_ONE, 4, Duration.standardMinutes(2)), |
| event(TestUser.BLUE_TWO, 3, Duration.ZERO), |
| event(TestUser.BLUE_ONE, 3, Duration.standardMinutes(3))) |
| .advanceProcessingTime(Duration.standardMinutes(5)) |
| .advanceWatermarkTo(baseTime.plus(ALLOWED_LATENESS).plus(Duration.standardHours(12))) |
| // Late elements are always observable within the global window - they arrive before |
| // the window closes, so they will appear in a pane, even if they arrive after the |
| // allowed lateness, and are taken into account alongside on-time elements |
| .addElements( |
| event(TestUser.RED_ONE, 3, Duration.standardMinutes(7)), |
| event(TestUser.RED_ONE, 2, (ALLOWED_LATENESS).plus(Duration.standardHours(13)))) |
| .advanceProcessingTime(Duration.standardMinutes(6)) |
| .addElements(event(TestUser.BLUE_TWO, 5, Duration.standardMinutes(12))) |
| .advanceProcessingTime(Duration.standardMinutes(20)) |
| .advanceWatermarkToInfinity(); |
| |
| PCollection<KV<String, Integer>> userScores = |
| p.apply(infos).apply(new CalculateUserScores(ALLOWED_LATENESS)); |
| |
| // User scores are emitted in speculative panes in the Global Window - this matcher choice |
| // ensures that panes emitted by the watermark advancing to positive infinity are not included, |
| // as that will not occur outside of tests |
| PAssert.that(userScores) |
| .inEarlyGlobalWindowPanes() |
| .containsInAnyOrder(KV.of(TestUser.BLUE_ONE.getUser(), 15), |
| KV.of(TestUser.RED_ONE.getUser(), 7), |
| KV.of(TestUser.RED_ONE.getUser(), 12), |
| KV.of(TestUser.BLUE_TWO.getUser(), 3), |
| KV.of(TestUser.BLUE_TWO.getUser(), 8)); |
| |
| p.run().waitUntilFinish(); |
| } |
| |
| @Test |
| public void testLeaderBoardOptions() { |
| PipelineOptionsFactory.as(LeaderBoard.Options.class); |
| } |
| |
| private TimestampedValue<GameActionInfo> event( |
| TestUser user, |
| int score, |
| Duration baseTimeOffset) { |
| return TimestampedValue.of(new GameActionInfo(user.getUser(), |
| user.getTeam(), |
| score, |
| baseTime.plus(baseTimeOffset).getMillis()), baseTime.plus(baseTimeOffset)); |
| } |
| } |