| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.beam.examples.complete.game; |
| |
| import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects.firstNonNull; |
| |
| import java.util.HashMap; |
| import java.util.Map; |
| import org.apache.beam.examples.common.ExampleUtils; |
| import org.apache.beam.examples.complete.game.utils.GameConstants; |
| import org.apache.beam.examples.complete.game.utils.WriteToBigQuery.FieldInfo; |
| import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery; |
| import org.apache.beam.sdk.Pipeline; |
| import org.apache.beam.sdk.PipelineResult; |
| import org.apache.beam.sdk.coders.VarIntCoder; |
| import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; |
| import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO; |
| import org.apache.beam.sdk.options.Default; |
| import org.apache.beam.sdk.options.Description; |
| import org.apache.beam.sdk.options.PipelineOptionsFactory; |
| import org.apache.beam.sdk.state.StateSpec; |
| import org.apache.beam.sdk.state.StateSpecs; |
| import org.apache.beam.sdk.state.ValueState; |
| import org.apache.beam.sdk.transforms.DoFn; |
| import org.apache.beam.sdk.transforms.MapElements; |
| import org.apache.beam.sdk.transforms.ParDo; |
| import org.apache.beam.sdk.values.KV; |
| import org.apache.beam.sdk.values.TypeDescriptor; |
| import org.apache.beam.sdk.values.TypeDescriptors; |
| import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; |
| import org.joda.time.Instant; |
| |
| /** |
| * This class is part of a series of pipelines that tell a story in a gaming domain. Concepts |
| * include: stateful processing. |
| * |
| * <p>This pipeline processes an unbounded stream of 'game events'. It uses stateful processing to |
| * aggregate team scores per team and outputs team name and it's total score every time the team |
| * passes a new multiple of a threshold score. For example, multiples of the threshold could be the |
| * corresponding scores required to pass each level of the game. By default, this threshold is set |
| * to 5000. |
| * |
| * <p>Stateful processing allows us to write pipelines that output based on a runtime state (when a |
| * team reaches a certain score, in every 100 game events etc) without time triggers. See |
| * https://beam.apache.org/blog/2017/02/13/stateful-processing.html for more information on using |
| * stateful processing. |
| * |
| * <p>Run {@code injector.Injector} to generate pubsub data for this pipeline. The Injector |
| * documentation provides more detail on how to do this. |
| * |
| * <p>To execute this pipeline, specify the pipeline configuration like this: |
| * |
| * <pre>{@code |
| * --project=YOUR_PROJECT_ID |
| * --tempLocation=gs://YOUR_TEMP_DIRECTORY |
| * --runner=YOUR_RUNNER |
| * --dataset=YOUR-DATASET |
| * --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC |
| * }</pre> |
| * |
| * <p>The BigQuery dataset you specify must already exist. The PubSub topic you specify should be |
| * the same topic to which the Injector is publishing. |
| */ |
| public class StatefulTeamScore extends LeaderBoard { |
| |
| /** Options supported by {@link StatefulTeamScore}. */ |
| public interface Options extends LeaderBoard.Options { |
| |
| @Description("Numeric value, multiple of which is used as threshold for outputting team score.") |
| @Default.Integer(5000) |
| Integer getThresholdScore(); |
| |
| void setThresholdScore(Integer value); |
| } |
| |
| /** |
| * Create a map of information that describes how to write pipeline output to BigQuery. This map |
| * is used to write team score sums. |
| */ |
| private static Map<String, FieldInfo<KV<String, Integer>>> configureCompleteWindowedTableWrite() { |
| |
| Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure = |
| new HashMap<>(); |
| tableConfigure.put( |
| "team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey())); |
| tableConfigure.put( |
| "total_score", |
| new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue())); |
| tableConfigure.put( |
| "processing_time", |
| new WriteWindowedToBigQuery.FieldInfo<>( |
| "STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now()))); |
| return tableConfigure; |
| } |
| |
| public static void main(String[] args) throws Exception { |
| |
| Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); |
| // Enforce that this pipeline is always run in streaming mode. |
| options.setStreaming(true); |
| ExampleUtils exampleUtils = new ExampleUtils(options); |
| Pipeline pipeline = Pipeline.create(options); |
| |
| pipeline |
| // Read game events from Pub/Sub using custom timestamps, which are extracted from the |
| // pubsub data elements, and parse the data. |
| .apply( |
| PubsubIO.readStrings() |
| .withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE) |
| .fromTopic(options.getTopic())) |
| .apply("ParseGameEvent", ParDo.of(new ParseEventFn())) |
| // Create <team, GameActionInfo> mapping. UpdateTeamScore uses team name as key. |
| .apply( |
| "MapTeamAsKey", |
| MapElements.into( |
| TypeDescriptors.kvs( |
| TypeDescriptors.strings(), TypeDescriptor.of(GameActionInfo.class))) |
| .via((GameActionInfo gInfo) -> KV.of(gInfo.team, gInfo))) |
| // Outputs a team's score every time it passes a new multiple of the threshold. |
| .apply("UpdateTeamScore", ParDo.of(new UpdateTeamScoreFn(options.getThresholdScore()))) |
| // Write the results to BigQuery. |
| .apply( |
| "WriteTeamLeaders", |
| new WriteWindowedToBigQuery<>( |
| options.as(GcpOptions.class).getProject(), |
| options.getDataset(), |
| options.getLeaderBoardTableName() + "_team_leader", |
| configureCompleteWindowedTableWrite())); |
| |
| // Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the |
| // command line. |
| PipelineResult result = pipeline.run(); |
| exampleUtils.waitToFinish(result); |
| } |
| |
| /** |
| * Tracks each team's score separately in a single state cell and outputs the score every time it |
| * passes a new multiple of a threshold. |
| * |
| * <p>We use stateful {@link DoFn} because: |
| * |
| * <ul> |
| * <li>State is key-partitioned. Therefore, the score is calculated per team. |
| * <li>Stateful {@link DoFn} can determine when to output based on the state. This only allows |
| * outputting when a team's score passes a given threshold. |
| * </ul> |
| */ |
| @VisibleForTesting |
| public static class UpdateTeamScoreFn |
| extends DoFn<KV<String, GameActionInfo>, KV<String, Integer>> { |
| |
| private static final String TOTAL_SCORE = "totalScore"; |
| private final int thresholdScore; |
| |
| public UpdateTeamScoreFn(int thresholdScore) { |
| this.thresholdScore = thresholdScore; |
| } |
| |
| /** |
| * Describes the state for storing team score. Let's break down this statement. |
| * |
| * <p>{@link StateSpec} configures the state cell, which is provided by a runner during pipeline |
| * execution. |
| * |
| * <p>{@link org.apache.beam.sdk.transforms.DoFn.StateId} annotation assigns an identifier to |
| * the state, which is used to refer the state in {@link |
| * org.apache.beam.sdk.transforms.DoFn.ProcessElement}. |
| * |
| * <p>A {@link ValueState} stores single value per key and per window. Because our pipeline is |
| * globally windowed in this example, this {@link ValueState} is just key partitioned, with one |
| * score per team. Any other class that extends {@link org.apache.beam.sdk.state.State} can be |
| * used. |
| * |
| * <p>In order to store the value, the state must be encoded. Therefore, we provide a coder, in |
| * this case the {@link VarIntCoder}. If the coder is not provided as in {@code |
| * StateSpecs.value()}, Beam's coder inference will try to provide a coder automatically. |
| */ |
| @StateId(TOTAL_SCORE) |
| private final StateSpec<ValueState<Integer>> totalScoreSpec = |
| StateSpecs.value(VarIntCoder.of()); |
| |
| /** |
| * To use a state cell, annotate a parameter with {@link |
| * org.apache.beam.sdk.transforms.DoFn.StateId} that matches the state declaration. The type of |
| * the parameter should match the {@link StateSpec} type. |
| */ |
| @ProcessElement |
| public void processElement( |
| ProcessContext c, @StateId(TOTAL_SCORE) ValueState<Integer> totalScore) { |
| String teamName = c.element().getKey(); |
| GameActionInfo gInfo = c.element().getValue(); |
| |
| // ValueState cells do not contain a default value. If the state is possibly not written, make |
| // sure to check for null on read. |
| int oldTotalScore = firstNonNull(totalScore.read(), 0); |
| totalScore.write(oldTotalScore + gInfo.score); |
| |
| // Since there are no negative scores, the easiest way to check whether a team just passed a |
| // new multiple of the threshold score is to compare the quotients of dividing total scores by |
| // threshold before and after this aggregation. For example, if the total score was 1999, |
| // the new total is 2002, and the threshold is 1000, 1999 / 1000 = 1, 2002 / 1000 = 2. |
| // Therefore, this team passed the threshold. |
| if (oldTotalScore / this.thresholdScore < totalScore.read() / this.thresholdScore) { |
| c.output(KV.of(teamName, totalScore.read())); |
| } |
| } |
| } |
| } |