blob: 07d08e82e79e2b3de7b3e34a755ed750a5785e58 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.examples.complete.game;
import static org.apache.beam.vendor.guava.v20_0.com.google.common.base.MoreObjects.firstNonNull;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.examples.common.ExampleUtils;
import org.apache.beam.examples.complete.game.utils.GameConstants;
import org.apache.beam.examples.complete.game.utils.WriteToBigQuery.FieldInfo;
import org.apache.beam.examples.complete.game.utils.WriteWindowedToBigQuery;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.vendor.guava.v20_0.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Instant;
/**
* This class is part of a series of pipelines that tell a story in a gaming domain. Concepts
* include: stateful processing.
*
* <p>This pipeline processes an unbounded stream of 'game events'. It uses stateful processing to
* aggregate team scores per team and outputs team name and it's total score every time the team
* passes a new multiple of a threshold score. For example, multiples of the threshold could be the
* corresponding scores required to pass each level of the game. By default, this threshold is set
* to 5000.
*
* <p>Stateful processing allows us to write pipelines that output based on a runtime state (when a
* team reaches a certain score, in every 100 game events etc) without time triggers. See
* https://beam.apache.org/blog/2017/02/13/stateful-processing.html for more information on using
* stateful processing.
*
* <p>Run {@code injector.Injector} to generate pubsub data for this pipeline. The Injector
* documentation provides more detail on how to do this.
*
* <p>To execute this pipeline, specify the pipeline configuration like this:
*
* <pre>{@code
* --project=YOUR_PROJECT_ID
* --tempLocation=gs://YOUR_TEMP_DIRECTORY
* --runner=YOUR_RUNNER
* --dataset=YOUR-DATASET
* --topic=projects/YOUR-PROJECT/topics/YOUR-TOPIC
* }</pre>
*
* <p>The BigQuery dataset you specify must already exist. The PubSub topic you specify should be
* the same topic to which the Injector is publishing.
*/
public class StatefulTeamScore extends LeaderBoard {
/** Options supported by {@link StatefulTeamScore}. */
public interface Options extends LeaderBoard.Options {
@Description("Numeric value, multiple of which is used as threshold for outputting team score.")
@Default.Integer(5000)
Integer getThresholdScore();
void setThresholdScore(Integer value);
}
/**
* Create a map of information that describes how to write pipeline output to BigQuery. This map
* is used to write team score sums.
*/
private static Map<String, FieldInfo<KV<String, Integer>>> configureCompleteWindowedTableWrite() {
Map<String, WriteWindowedToBigQuery.FieldInfo<KV<String, Integer>>> tableConfigure =
new HashMap<>();
tableConfigure.put(
"team", new WriteWindowedToBigQuery.FieldInfo<>("STRING", (c, w) -> c.element().getKey()));
tableConfigure.put(
"total_score",
new WriteWindowedToBigQuery.FieldInfo<>("INTEGER", (c, w) -> c.element().getValue()));
tableConfigure.put(
"processing_time",
new WriteWindowedToBigQuery.FieldInfo<>(
"STRING", (c, w) -> GameConstants.DATE_TIME_FORMATTER.print(Instant.now())));
return tableConfigure;
}
public static void main(String[] args) throws Exception {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
// Enforce that this pipeline is always run in streaming mode.
options.setStreaming(true);
ExampleUtils exampleUtils = new ExampleUtils(options);
Pipeline pipeline = Pipeline.create(options);
pipeline
// Read game events from Pub/Sub using custom timestamps, which are extracted from the
// pubsub data elements, and parse the data.
.apply(
PubsubIO.readStrings()
.withTimestampAttribute(GameConstants.TIMESTAMP_ATTRIBUTE)
.fromTopic(options.getTopic()))
.apply("ParseGameEvent", ParDo.of(new ParseEventFn()))
// Create <team, GameActionInfo> mapping. UpdateTeamScore uses team name as key.
.apply(
"MapTeamAsKey",
MapElements.into(
TypeDescriptors.kvs(
TypeDescriptors.strings(), TypeDescriptor.of(GameActionInfo.class)))
.via((GameActionInfo gInfo) -> KV.of(gInfo.team, gInfo)))
// Outputs a team's score every time it passes a new multiple of the threshold.
.apply("UpdateTeamScore", ParDo.of(new UpdateTeamScoreFn(options.getThresholdScore())))
// Write the results to BigQuery.
.apply(
"WriteTeamLeaders",
new WriteWindowedToBigQuery<>(
options.as(GcpOptions.class).getProject(),
options.getDataset(),
options.getLeaderBoardTableName() + "_team_leader",
configureCompleteWindowedTableWrite()));
// Run the pipeline and wait for the pipeline to finish; capture cancellation requests from the
// command line.
PipelineResult result = pipeline.run();
exampleUtils.waitToFinish(result);
}
/**
* Tracks each team's score separately in a single state cell and outputs the score every time it
* passes a new multiple of a threshold.
*
* <p>We use stateful {@link DoFn} because:
*
* <ul>
* <li>State is key-partitioned. Therefore, the score is calculated per team.
* <li>Stateful {@link DoFn} can determine when to output based on the state. This only allows
* outputting when a team's score passes a given threshold.
* </ul>
*/
@VisibleForTesting
public static class UpdateTeamScoreFn
extends DoFn<KV<String, GameActionInfo>, KV<String, Integer>> {
private static final String TOTAL_SCORE = "totalScore";
private final int thresholdScore;
public UpdateTeamScoreFn(int thresholdScore) {
this.thresholdScore = thresholdScore;
}
/**
* Describes the state for storing team score. Let's break down this statement.
*
* <p>{@link StateSpec} configures the state cell, which is provided by a runner during pipeline
* execution.
*
* <p>{@link org.apache.beam.sdk.transforms.DoFn.StateId} annotation assigns an identifier to
* the state, which is used to refer the state in {@link
* org.apache.beam.sdk.transforms.DoFn.ProcessElement}.
*
* <p>A {@link ValueState} stores single value per key and per window. Because our pipeline is
* globally windowed in this example, this {@link ValueState} is just key partitioned, with one
* score per team. Any other class that extends {@link org.apache.beam.sdk.state.State} can be
* used.
*
* <p>In order to store the value, the state must be encoded. Therefore, we provide a coder, in
* this case the {@link VarIntCoder}. If the coder is not provided as in {@code
* StateSpecs.value()}, Beam's coder inference will try to provide a coder automatically.
*/
@StateId(TOTAL_SCORE)
private final StateSpec<ValueState<Integer>> totalScoreSpec =
StateSpecs.value(VarIntCoder.of());
/**
* To use a state cell, annotate a parameter with {@link
* org.apache.beam.sdk.transforms.DoFn.StateId} that matches the state declaration. The type of
* the parameter should match the {@link StateSpec} type.
*/
@ProcessElement
public void processElement(
ProcessContext c, @StateId(TOTAL_SCORE) ValueState<Integer> totalScore) {
String teamName = c.element().getKey();
GameActionInfo gInfo = c.element().getValue();
// ValueState cells do not contain a default value. If the state is possibly not written, make
// sure to check for null on read.
int oldTotalScore = firstNonNull(totalScore.read(), 0);
totalScore.write(oldTotalScore + gInfo.score);
// Since there are no negative scores, the easiest way to check whether a team just passed a
// new multiple of the threshold score is to compare the quotients of dividing total scores by
// threshold before and after this aggregation. For example, if the total score was 1999,
// the new total is 2002, and the threshold is 1000, 1999 / 1000 = 1, 2002 / 1000 = 2.
// Therefore, this team passed the threshold.
if (oldTotalScore / this.thresholdScore < totalScore.read() / this.thresholdScore) {
c.output(KV.of(teamName, totalScore.read()));
}
}
}
}