<!doctype html><html lang=en class=no-js><head><meta charset=utf-8><meta http-equiv=x-ua-compatible content="IE=edge"><meta name=viewport content="width=device-width,initial-scale=1"><title>Beam Mobile Gaming Example</title><meta name=description content="Apache Beam is an open source, unified model and set of language-specific SDKs for defining and executing data processing workflows, and also data ingestion and integration flows, supporting Enterprise Integration Patterns (EIPs) and Domain Specific Languages (DSLs). Dataflow pipelines simplify the mechanics of large-scale batch and streaming data processing and can run on a number of runtimes like Apache Flink, Apache Spark, and Google Cloud Dataflow (a cloud service). Beam also brings DSL in different languages, allowing users to easily implement their data integration processes."><link href="https://fonts.googleapis.com/css?family=Roboto:100,300,400,500,700" rel=stylesheet><link rel=preload href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css as=style><link href=/scss/main.min.408fddfe3e8a45f87a5a8c9a839d77db667c1c534e5e5cd0d957ffc3dd6c14cf.css rel=stylesheet integrity><script src=https://code.jquery.com/jquery-2.2.4.min.js></script><style>.body__contained img{max-width:100%}</style><script type=text/javascript src=/js/bootstrap.min.2979f9a6e32fc42c3e7406339ee9fe76b31d1b52059776a02b4a7fa6a4fd280a.js defer></script>
<script type=text/javascript src=/js/language-switch-v2.min.121952b7980b920320ab229551857669209945e39b05ba2b433a565385ca44c6.js defer></script>
<script type=text/javascript src=/js/fix-menu.min.039174b67107465f2090a493f91e126f7aa797f29420f9edab8a54d9dd4b3d2d.js defer></script>
<script type=text/javascript src=/js/section-nav.min.1405fd5e70fab5f6c54037c269b1d137487d8f3d1b3009032525f6db3fbce991.js defer></script>
<script type=text/javascript src=/js/page-nav.min.af231204c9c52c5089d53a4c02739eacbb7f939e3be1c6ffcc212e0ac4dbf879.js defer></script>
<script type=text/javascript src=/js/expandable-list.min.75a4526624a3b8898fe7fb9e3428c205b581f8b38c7926922467aef17eac69f2.js defer></script>
<script type=text/javascript src=/js/copy-to-clipboard.min.364c06423d7e8993fc42bb4abc38c03195bc8386db26d18774ce775d08d5b18d.js defer></script>
<script type=text/javascript src=/js/calendar.min.336664054fa0f52b08bbd4e3c59b5cb6d63dcfb2b4d602839746516b0817446b.js defer></script>
<script type=text/javascript src=/js/fix-playground-nested-scroll.min.0283f1037cb1b9d5074c6eaf041292b524a8148a7cdb803d5ccd6d1fc4eb3253.js defer></script>
<script type=text/javascript src=/js/anchor-content-jump-fix.min.22d3240f81632e4c11179b9d2aaf37a40da9414333c43aa97344e8b21a7df0e4.js defer></script>
<link rel=alternate type=application/rss+xml title="Apache Beam" href=/feed.xml><link rel=canonical href=/get-started/mobile-gaming-example/ data-proofer-ignore><link rel="shortcut icon" type=image/x-icon href=/images/favicon.ico><link rel=stylesheet href=https://use.fontawesome.com/releases/v5.4.1/css/all.css integrity=sha384-5sAR7xN1Nv6T6+dT2mhtzEpVJvfS3NScPQTrOxhwjIuvcA67KV2R5Jz6kr4abQsz crossorigin=anonymous><link rel=stylesheet href=https://unpkg.com/swiper@8/swiper-bundle.min.css><script async src=https://platform.twitter.com/widgets.js></script>
<script>(function(e,t,n,s,o,i,a){e.GoogleAnalyticsObject=o,e[o]=e[o]||function(){(e[o].q=e[o].q||[]).push(arguments)},e[o].l=1*new Date,i=t.createElement(n),a=t.getElementsByTagName(n)[0],i.async=1,i.src=s,a.parentNode.insertBefore(i,a)})(window,document,"script","//www.google-analytics.com/analytics.js","ga"),ga("create","UA-73650088-1","auto"),ga("send","pageview")</script><script>(function(e,t,n,s,o,i){e.hj=e.hj||function(){(e.hj.q=e.hj.q||[]).push(arguments)},e._hjSettings={hjid:2182187,hjsv:6},o=t.getElementsByTagName("head")[0],i=t.createElement("script"),i.async=1,i.src=n+e._hjSettings.hjid+s+e._hjSettings.hjsv,o.appendChild(i)})(window,document,"https://static.hotjar.com/c/hotjar-",".js?sv=")</script></head><body class=body data-spy=scroll data-target=.page-nav data-offset=0><nav class="navigation-bar-mobile header navbar navbar-fixed-top"><div class=navbar-header><a href=/ class=navbar-brand><img alt=Brand style=height:46px;width:43px src=/images/beam_logo_navbar_mobile.png></a>
<a class=navbar-link href=/get-started/>Get Started</a>
<a class=navbar-link href=/documentation/>Documentation</a>
<button type=button class="navbar-toggle menu-open" aria-expanded=false aria-controls=navbar onclick=openMenu()>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button></div><div class="navbar-mask closed"></div><div id=navbar class="navbar-container closed"><button type=button class=navbar-toggle aria-expanded=false aria-controls=navbar id=closeMenu>
<span class=sr-only>Toggle navigation</span>
<span class=icon-bar></span>
<span class=icon-bar></span>
<span class=icon-bar></span></button><ul class="nav navbar-nav"><li><div class=searchBar-mobile><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search></div></li><li><a class=navbar-link href=/about>About</a></li><li><a class=navbar-link href=/get-started/>Get Started</a></li><li><span class=navbar-link>Documentation</span><ul><li><a href=/documentation/>General</a></li><li><a href=/documentation/sdks/java/>Languages</a></li><li><a href=/documentation/runners/capability-matrix/>Runners</a></li><li><a href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><li><a class=navbar-link href=/roadmap/>Roadmap</a></li><li><a class=navbar-link href=/community/>Community</a></li><li><a class=navbar-link href=/contribute/>Contribute</a></li><li><a class=navbar-link href=/blog/>Blog</a></li><li><a class=navbar-link href=/case-studies/>Case Studies</a></li></ul><ul class="nav navbar-nav navbar-right"><li><a href=https://github.com/apache/beam/edit/master/website/www/site/content/en/get-started/mobile-gaming-example.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a></li><li class=dropdown><a href=# class=dropdown-toggle id=apache-dropdown data-toggle=dropdown role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class="dropdown-menu dropdown-menu-right"><li><a target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a target=_blank href=https://www.apache.org/security/>Security</a></li><li><a target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></ul></div></nav><nav class=navigation-bar-desktop><a href=/ class=navbar-logo><img src=/images/beam_logo_navbar.png alt="Beam Logo"></a><div class=navbar-bar-left><div class=navbar-links><a class=navbar-link href=/about>About</a>
<a class=navbar-link href=/get-started/>Get Started</a><li class="dropdown navbar-dropdown navbar-dropdown-documentation"><a href=# class="dropdown-toggle navbar-link" role=button aria-haspopup=true aria-expanded=false>Documentation
<span><svg xmlns="http://www.w3.org/2000/svg" width="12" height="11" fill="none" viewBox="0 0 12 11"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M10.666 4.535 5.847 9.108 1.444 4.535"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link href=/documentation/>General</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/sdks/java/>Languages</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/runners/capability-matrix/>Runners</a></li><li><a class=navbar-dropdown-menu-link href=/documentation/io/connectors/>I/O Connectors</a></li></ul></li><a class=navbar-link href=/roadmap/>Roadmap</a>
<a class=navbar-link href=/community/>Community</a>
<a class=navbar-link href=/contribute/>Contribute</a>
<a class=navbar-link href=/blog/>Blog</a>
<a class=navbar-link href=/case-studies/>Case Studies</a></div><div id=iconsBar><a type=button onclick=showSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M10.191 17c3.866.0 7-3.134 7-7s-3.134-7-7-7-7 3.134-7 7 3.134 7 7 7zm11 4-6-6"/></svg></a><a target=_blank href=https://github.com/apache/beam/edit/master/website/www/site/content/en/get-started/mobile-gaming-example.md data-proofer-ignore><svg xmlns="http://www.w3.org/2000/svg" width="25" height="24" fill="none" viewBox="0 0 25 24"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M4.543 20h4l10.5-10.5c.53-.53.828-1.25.828-2s-.298-1.47-.828-2-1.25-.828-2-.828-1.47.298-2 .828L4.543 16v4zm9.5-13.5 4 4"/></svg></a><li class="dropdown navbar-dropdown navbar-dropdown-apache"><a href=# class=dropdown-toggle role=button aria-haspopup=true aria-expanded=false><img src=https://www.apache.org/foundation/press/kit/feather_small.png alt="Apache Logo" style=height:20px>
&nbsp;Apache
<span class=arrow-icon><svg xmlns="http://www.w3.org/2000/svg" width="20" height="20" fill="none" viewBox="0 0 20 20"><circle cx="10" cy="10" r="10" fill="#ff6d00"/><path stroke="#fff" stroke-linecap="round" stroke-linejoin="round" stroke-width="2" d="M8.535 5.28l4.573 4.818-4.573 4.403"/></svg></span></a><ul class=dropdown-menu><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/>ASF Homepage</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/licenses/>License</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/security/>Security</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/thanks.html>Thanks</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/sponsorship.html>Sponsorship</a></li><li><a class=navbar-dropdown-menu-link target=_blank href=https://www.apache.org/foundation/policies/conduct>Code of Conduct</a></li></ul></li></div><div class="searchBar disappear"><script>(function(){var t,n="012923275103528129024:4emlchv9wzi",e=document.createElement("script");e.type="text/javascript",e.async=!0,e.src="https://cse.google.com/cse.js?cx="+n,t=document.getElementsByTagName("script")[0],t.parentNode.insertBefore(e,t)})()</script><gcse:search></gcse:search>
<a type=button onclick=endSearch()><svg xmlns="http://www.w3.org/2000/svg" width="25" height="25" fill="none" viewBox="0 0 25 25"><path stroke="#ff6d00" stroke-linecap="round" stroke-linejoin="round" stroke-width="2.75" d="M21.122 20.827 4.727 4.432M21.122 4.43 4.727 20.827"/></svg></a></div></div></nav><div class=header-push></div><div class="top-banners swiper"><div class=swiper-wrapper><div class=swiper-slide><a href=https://tour.beam.apache.org><img class=banner-img-desktop src=/images/banners/tour-of-beam/tour-of-beam-desktop.png alt="Start Tour of Beam">
<img class=banner-img-mobile src=/images/banners/tour-of-beam/tour-of-beam-mobile.png alt="Start Tour of Beam"></a></div><div class=swiper-slide><a href=https://beam.apache.org/documentation/ml/overview/><img class=banner-img-desktop src=/images/banners/machine-learning/machine-learning-desktop.jpg alt="Machine Learning">
<img class=banner-img-mobile src=/images/banners/machine-learning/machine-learning-mobile.jpg alt="Machine Learning"></a></div></div><div class=swiper-pagination></div><div class=swiper-button-prev></div><div class=swiper-button-next></div></div><script src=/js/swiper-bundle.min.min.e0e8f81b0b15728d35ff73c07f42ddbb17a108d6f23df4953cb3e60df7ade675.js></script>
<script src=/js/sliders/top-banners.min.afa7d0a19acf7a3b28ca369490b3d401a619562a2a4c9612577be2f66a4b9855.js></script>
<script>function showSearch(){addPlaceholder();var e,t=document.querySelector(".searchBar");t.classList.remove("disappear"),e=document.querySelector("#iconsBar"),e.classList.add("disappear")}function addPlaceholder(){$("input:text").attr("placeholder","What are you looking for?")}function endSearch(){var e,t=document.querySelector(".searchBar");t.classList.add("disappear"),e=document.querySelector("#iconsBar"),e.classList.remove("disappear")}function blockScroll(){$("body").toggleClass("fixedPosition")}function openMenu(){addPlaceholder(),blockScroll()}</script><div class="clearfix container-main-content"><div class="section-nav closed" data-offset-top=90 data-offset-bottom=500><span class="section-nav-back glyphicon glyphicon-menu-left"></span><nav><ul class=section-nav-list data-section-nav><li><span class=section-nav-list-main-title>Get started</span></li><li><a href=/get-started/beam-overview/>Beam Overview</a></li><li><a href=/get-started/an-interactive-overview-of-beam/>An Interactive Overview of Beam</a></li><li><span class=section-nav-list-title>Quickstarts</span><ul class=section-nav-list><li><a href=https://tour.beam.apache.org>Tour of Beam</a></li><li><a href=/get-started/try-apache-beam/>Try Apache Beam</a></li><li><a href=/get-started/try-beam-playground/>Try Beam Playground</a></li><li><a href=/get-started/quickstart/java/>Java quickstart</a></li><li><a href=/get-started/quickstart/python/>Python quickstart</a></li><li><a href=/get-started/quickstart/go/>Go quickstart</a></li><li><a href=/get-started/quickstart/typescript/>Typescript quickstart</a></li><li><a href=/get-started/from-spark/>Apache Spark</a></li><li><a href=/get-started/quickstart-java/>WordCount (Java)</a></li><li><a href=/get-started/quickstart-py/>WordCount (Python)</a></li><li><a href=/get-started/quickstart-go/>WordCount (Go)</a></li></ul></li><li><a href=/get-started/downloads>Install the SDK</a></li><li><span class=section-nav-list-title>Tutorials</span><ul class=section-nav-list><li><a href=/get-started/wordcount-example/>WordCount</a></li><li><a href=/get-started/mobile-gaming-example/>Mobile Gaming</a></li></ul></li><li class=section-nav-item--collapsible><span class=section-nav-list-title>Learning resources</span><ul class=section-nav-list><li><a href=/get-started/resources/learning-resources/#getting-started>Getting Started</a></li><li><a href=/get-started/resources/learning-resources/#articles>Articles</a></li><li><a href=/get-started/resources/learning-resources/#videos>Videos</a></li><li><a href=/get-started/resources/learning-resources/#courses>Courses</a></li><li><a href=/get-started/resources/learning-resources/#books>Books</a></li><li><a href=/get-started/resources/learning-resources/#certifications>Certifications</a></li><li><a href=/get-started/resources/learning-resources/#interactive-labs>Interactive Labs</a></li><li><a href=/get-started/resources/learning-resources/#beam-katas>Beam Katas</a></li><li><a href=/get-started/resources/learning-resources/#code-examples>Code Examples</a></li><li><a href=/get-started/resources/learning-resources/#api-reference>API Reference</a></li><li><a href=/get-started/resources/learning-resources/#feedback-and-suggestions>Feedback and Suggestions</a></li><li><a href=/get-started/resources/learning-resources/#how-to-contribute>How to Contribute</a></li><li><a href=/get-started/resources/videos-and-podcasts>Videos and Podcasts</a></li></ul></li><li><a href=/security>Security</a></li></ul></nav></div><nav class="page-nav clearfix" data-offset-top=90 data-offset-bottom=500><nav id=TableOfContents><ul><li><a href=#userscore-basic-score-processing-in-batch>UserScore: Basic Score Processing in Batch</a><ul><li><a href=#what-does-userscore-do>What Does UserScore Do?</a></li><li><a href=#limitations>Limitations</a></li></ul></li><li><a href=#hourlyteamscore-advanced-processing-in-batch-with-windowing>HourlyTeamScore: Advanced Processing in Batch with Windowing</a><ul><li><a href=#what-does-hourlyteamscore-do>What Does HourlyTeamScore Do?</a><ul><li><a href=#fixed-time-windowing>Fixed-Time Windowing</a></li><li><a href=#filtering-based-on-event-time>Filtering Based On Event Time</a></li><li><a href=#calculating-score-per-team-per-window>Calculating Score Per Team, Per Window</a></li></ul></li><li><a href=#limitations-1>Limitations</a></li></ul></li><li><a href=#leaderboard-streaming-processing-with-real-time-game-data>LeaderBoard: Streaming Processing with Real-Time Game Data</a><ul><li><a href=#what-does-leaderboard-do>What Does LeaderBoard Do?</a><ul><li><a href=#calculating-user-score-based-on-processing-time>Calculating User Score based on Processing Time</a></li><li><a href=#calculating-team-score-based-on-event-time>Calculating Team Score based on Event Time</a></li></ul></li></ul></li><li><a href=#gamestats-abuse-detection-and-usage-analysis>GameStats: Abuse Detection and Usage Analysis</a><ul><li><a href=#what-does-gamestats-do>What Does GameStats Do?</a><ul><li><a href=#abuse-detection>Abuse Detection</a></li><li><a href=#analyzing-usage-patterns>Analyzing Usage Patterns</a></li></ul></li></ul></li><li><a href=#next-steps>Next Steps</a></li></ul></nav></nav><div class="body__contained body__section-nav"><h1 id=apache-beam-mobile-gaming-pipeline-examples>Apache Beam Mobile Gaming Pipeline Examples</h1><nav id=TableOfContents><ul><li><a href=#userscore-basic-score-processing-in-batch>UserScore: Basic Score Processing in Batch</a><ul><li><a href=#what-does-userscore-do>What Does UserScore Do?</a></li><li><a href=#limitations>Limitations</a></li></ul></li><li><a href=#hourlyteamscore-advanced-processing-in-batch-with-windowing>HourlyTeamScore: Advanced Processing in Batch with Windowing</a><ul><li><a href=#what-does-hourlyteamscore-do>What Does HourlyTeamScore Do?</a><ul><li><a href=#fixed-time-windowing>Fixed-Time Windowing</a></li><li><a href=#filtering-based-on-event-time>Filtering Based On Event Time</a></li><li><a href=#calculating-score-per-team-per-window>Calculating Score Per Team, Per Window</a></li></ul></li><li><a href=#limitations-1>Limitations</a></li></ul></li><li><a href=#leaderboard-streaming-processing-with-real-time-game-data>LeaderBoard: Streaming Processing with Real-Time Game Data</a><ul><li><a href=#what-does-leaderboard-do>What Does LeaderBoard Do?</a><ul><li><a href=#calculating-user-score-based-on-processing-time>Calculating User Score based on Processing Time</a></li><li><a href=#calculating-team-score-based-on-event-time>Calculating Team Score based on Event Time</a></li></ul></li></ul></li><li><a href=#gamestats-abuse-detection-and-usage-analysis>GameStats: Abuse Detection and Usage Analysis</a><ul><li><a href=#what-does-gamestats-do>What Does GameStats Do?</a><ul><li><a href=#abuse-detection>Abuse Detection</a></li><li><a href=#analyzing-usage-patterns>Analyzing Usage Patterns</a></li></ul></li></ul></li><li><a href=#next-steps>Next Steps</a></li></ul></nav><nav class=language-switcher><strong>Adapt for:</strong><ul><li data-value=java class=active>Java SDK</li><li data-value=py>Python SDK</li></ul></nav><p>This section provides a walkthrough of a series of example Apache Beam pipelines that demonstrate more complex functionality than the basic <a href=/get-started/wordcount-example>WordCount</a> examples. The pipelines in this section process data from a hypothetical game that users play on their mobile phones. The pipelines demonstrate processing at increasing levels of complexity; the first pipeline, for example, shows how to run a batch analysis job to obtain relatively simple score data, while the later pipelines use Beam&rsquo;s windowing and triggers features to provide low-latency data analysis and more complex intelligence about user&rsquo;s play patterns.</p><p class=language-java><blockquote><p><strong>Note</strong>: These examples assume some familiarity with the Beam programming model. If you haven&rsquo;t already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing. Note also that these examples use the Java 8 lambda syntax, and thus require Java 8. However, you can create pipelines with equivalent functionality using Java 7.</p></blockquote></p><p class=language-py><blockquote><p><strong>Note</strong>: These examples assume some familiarity with the Beam programming model. If you haven&rsquo;t already, we recommend familiarizing yourself with the programming model documentation and running a basic example pipeline before continuing.</p></blockquote></p><blockquote><p><strong>Note</strong>: MobileGaming is not yet available for the Go SDK. There is an open issue for this
(<a href=https://github.com/apache/beam/issues/18806>Issue 18806</a>).</p></blockquote><p>Every time a user plays an instance of our hypothetical mobile game, they generate a data event. Each data event consists of the following information:</p><ul><li>The unique ID of the user playing the game.</li><li>The team ID for the team to which the user belongs.</li><li>A score value for that particular instance of play.</li><li>A timestamp that records when the particular instance of play happened&ndash;this is the event time for each game data event.</li></ul><p>When the user completes an instance of the game, their phone sends the data event to a game server, where the data is logged and stored in a file. Generally the data is sent to the game server immediately upon completion. However, sometimes delays can happen in the network at various points. Another possible scenario involves users who play the game &ldquo;offline&rdquo;, when their phones are out of contact with the server (such as on an airplane, or outside network coverage area). When the user&rsquo;s phone comes back into contact with the game server, the phone will send all accumulated game data. In these cases, some data events may arrive delayed and out of order.</p><p>The following diagram shows the ideal situation (events are processed as they occur) vs. reality (there is often a time delay before processing).</p><p><img src=/images/gaming-example-basic.png alt="There is often a time delay before processing events."></p><p><em>Figure 1: The X-axis represents event time: the actual time a game event
occurred. The Y-axis represents processing time: the time at which a game event
was processed. Ideally, events should be processed as they occur, depicted by
the dotted line in the diagram. However, in reality that is not the case and it
looks more like what is depicted by the red squiggly line above the ideal line.</em></p><p>The data events might be received by the game server significantly later than users generate them. This time difference (called <strong>skew</strong>) can have processing implications for pipelines that make calculations that consider when each score was generated. Such pipelines might track scores generated during each hour of a day, for example, or they calculate the length of time that users are continuously playing the game—both of which depend on each data record&rsquo;s event time.</p><p>Because some of our example pipelines use data files (like logs from the game server) as input, the event timestamp for each game might be embedded in the data&ndash;that is, it&rsquo;s a field in each data record. Those pipelines need to parse the event timestamp from each data record after reading it from the input file.</p><p>For pipelines that read unbounded game data from an unbounded source, the data source sets the intrinsic <a href=/documentation/programming-guide/#element-timestamps>timestamp</a> for each PCollection element to the appropriate event time.</p><p>The Mobile Gaming example pipelines vary in complexity, from simple batch analysis to more complex pipelines that can perform real-time analysis and abuse detection. This section walks you through each example and demonstrates how to use Beam features like windowing and triggers to expand your pipeline&rsquo;s capabilites.</p><h2 id=userscore-basic-score-processing-in-batch>UserScore: Basic Score Processing in Batch</h2><p>The <code>UserScore</code> pipeline is the simplest example for processing mobile game data. <code>UserScore</code> determines the total score per user over a finite data set (for example, one day&rsquo;s worth of scores stored on the game server). Pipelines like <code>UserScore</code> are best run periodically after all relevant data has been gathered. For example, <code>UserScore</code> could run as a nightly job over data gathered during that day.</p><p class=language-java><blockquote><p><strong>Note:</strong> See <a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/UserScore.java>UserScore on GitHub</a> for the complete example pipeline program.</p></blockquote></p><p class=language-py><blockquote><p><strong>Note:</strong> See <a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/user_score.py>UserScore on GitHub</a> for the complete example pipeline program.</p></blockquote></p><h3 id=what-does-userscore-do>What Does UserScore Do?</h3><p>In a day&rsquo;s worth of scoring data, each user ID may have multiple records (if the user plays more than one instance of the game during the analysis window), each with their own score value and timestamp. If we want to determine the total score over all the instances a user plays during the day, our pipeline will need to group all the records together per individual user.</p><p>As the pipeline processes each event, the event score gets added to the sum total for that particular user.</p><p><code>UserScore</code> parses out only the data that it needs from each record, specifically the user ID and the score value. The pipeline doesn&rsquo;t consider the event time for any record; it simply processes all data present in the input files that you specify when you run the pipeline.</p><blockquote><p><strong>Note:</strong> To use the <code>UserScore</code> pipeline effectively, you&rsquo;d need to ensure that you supply input data that has already been grouped by the desired event time period — that is, that you specify an input file that only contains data from the day you care about.</p></blockquote><p><code>UserScore</code>&rsquo;s basic pipeline flow does the following:</p><ol><li>Read the day&rsquo;s score data from a text file.</li><li>Sum the score values for each unique user by grouping each game event by user ID and combining the score values to get the total score for that particular user.</li><li>Write the result data to a text file.</li></ol><p>The following diagram shows score data for several users over the pipeline analysis period. In the diagram, each data point is an event that results in one user/score pair.</p><img src=/images/gaming-example.gif alt="A pipeline processes score data for three users." width=850px><p><em>Figure 2: Score data for three users.</em></p><p>This example uses batch processing, and the diagram&rsquo;s Y axis represents processing time: the pipeline processes events lower on the Y-axis first, and events higher up the axis later. The diagram&rsquo;s X axis represents the event time for each game event, as denoted by that event&rsquo;s timestamp. Note that the individual events in the diagram are not processed by the pipeline in the same order as they occurred (according to their timestamps).</p><p>After reading the score events from the input file, the pipeline groups all of those user/score pairs together and sums the score values into one total value per unique user. <code>UserScore</code> encapsulates the core logic for that step as the <a href=/documentation/programming-guide/#composite-transforms>user-defined composite transform</a> <code>ExtractAndSumScore</code>:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kd>class</span> <span class=nc>ExtractAndSumScore</span>
</span></span><span class=line><span class=cl>    <span class=kd>extends</span> <span class=n>PTransform</span><span class=o>&lt;</span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>GameActionInfo</span><span class=o>&gt;,</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;&gt;</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=kd>private</span> <span class=kd>final</span> <span class=n>String</span> <span class=n>field</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=n>ExtractAndSumScore</span><span class=o>(</span><span class=n>String</span> <span class=n>field</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>    <span class=k>this</span><span class=o>.</span><span class=na>field</span> <span class=o>=</span> <span class=n>field</span><span class=o>;</span>
</span></span><span class=line><span class=cl>  <span class=o>}</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=nd>@Override</span>
</span></span><span class=line><span class=cl>  <span class=kd>public</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span> <span class=nf>expand</span><span class=o>(</span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>GameActionInfo</span><span class=o>&gt;</span> <span class=n>gameInfo</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=k>return</span> <span class=n>gameInfo</span>
</span></span><span class=line><span class=cl>        <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>            <span class=n>MapElements</span><span class=o>.</span><span class=na>into</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                    <span class=n>TypeDescriptors</span><span class=o>.</span><span class=na>kvs</span><span class=o>(</span><span class=n>TypeDescriptors</span><span class=o>.</span><span class=na>strings</span><span class=o>(),</span> <span class=n>TypeDescriptors</span><span class=o>.</span><span class=na>integers</span><span class=o>()))</span>
</span></span><span class=line><span class=cl>                <span class=o>.</span><span class=na>via</span><span class=o>((</span><span class=n>GameActionInfo</span> <span class=n>gInfo</span><span class=o>)</span> <span class=o>-&gt;</span> <span class=n>KV</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>gInfo</span><span class=o>.</span><span class=na>getKey</span><span class=o>(</span><span class=n>field</span><span class=o>),</span> <span class=n>gInfo</span><span class=o>.</span><span class=na>getScore</span><span class=o>())))</span>
</span></span><span class=line><span class=cl>        <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Sum</span><span class=o>.</span><span class=na>integersPerKey</span><span class=o>());</span>
</span></span><span class=line><span class=cl>  <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=k>class</span> <span class=nc>ExtractAndSumScore</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=p>):</span>
</span></span><span class=line><span class=cl>  <span class=s2>&#34;&#34;&#34;A transform to extract key/score information and sum the scores.
</span></span></span><span class=line><span class=cl><span class=s2>  The constructor argument `field` determines whether &#39;team&#39; or &#39;user&#39; info is
</span></span></span><span class=line><span class=cl><span class=s2>  extracted.
</span></span></span><span class=line><span class=cl><span class=s2>  &#34;&#34;&#34;</span>
</span></span><span class=line><span class=cl>  <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>field</span><span class=p>):</span>
</span></span><span class=line><span class=cl>    <span class=c1># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span>
</span></span><span class=line><span class=cl>    <span class=c1># super().__init__()</span>
</span></span><span class=line><span class=cl>    <span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=o>.</span><span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>)</span>
</span></span><span class=line><span class=cl>    <span class=bp>self</span><span class=o>.</span><span class=n>field</span> <span class=o>=</span> <span class=n>field</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=k>def</span> <span class=nf>expand</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>pcoll</span><span class=p>):</span>
</span></span><span class=line><span class=cl>    <span class=k>return</span> <span class=p>(</span>
</span></span><span class=line><span class=cl>        <span class=n>pcoll</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=p>(</span><span class=n>elem</span><span class=p>[</span><span class=bp>self</span><span class=o>.</span><span class=n>field</span><span class=p>],</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;score&#39;</span><span class=p>]))</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>CombinePerKey</span><span class=p>(</span><span class=nb>sum</span><span class=p>))</span></span></span></code></pre></div></div></div><p><code>ExtractAndSumScore</code> is written to be more general, in that you can pass in the field by which you want to group the data (in the case of our game, by unique user or unique team). This means we can re-use <code>ExtractAndSumScore</code> in other pipelines that group score data by team, for example.</p><p>Here&rsquo;s the main method of <code>UserScore</code>, showing how we apply all three steps of the pipeline:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kt>void</span> <span class=nf>main</span><span class=o>(</span><span class=n>String</span><span class=o>[]</span> <span class=n>args</span><span class=o>)</span> <span class=kd>throws</span> <span class=n>Exception</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>  <span class=c1>// Begin constructing a pipeline configured by commandline flags.
</span></span></span><span class=line><span class=cl><span class=c1></span>  <span class=n>Options</span> <span class=n>options</span> <span class=o>=</span> <span class=n>PipelineOptionsFactory</span><span class=o>.</span><span class=na>fromArgs</span><span class=o>(</span><span class=n>args</span><span class=o>).</span><span class=na>withValidation</span><span class=o>().</span><span class=na>as</span><span class=o>(</span><span class=n>Options</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl>  <span class=n>Pipeline</span> <span class=n>pipeline</span> <span class=o>=</span> <span class=n>Pipeline</span><span class=o>.</span><span class=na>create</span><span class=o>(</span><span class=n>options</span><span class=o>);</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=c1>// Read events from a text file and parse them.
</span></span></span><span class=line><span class=cl><span class=c1></span>  <span class=n>pipeline</span>
</span></span><span class=line><span class=cl>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>TextIO</span><span class=o>.</span><span class=na>read</span><span class=o>().</span><span class=na>from</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getInput</span><span class=o>()))</span>
</span></span><span class=line><span class=cl>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;ParseGameEvent&#34;</span><span class=o>,</span> <span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>ParseEventFn</span><span class=o>()))</span>
</span></span><span class=line><span class=cl>      <span class=c1>// Extract and sum username/score pairs from the event data.
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;ExtractUserScore&#34;</span><span class=o>,</span> <span class=k>new</span> <span class=n>ExtractAndSumScore</span><span class=o>(</span><span class=s>&#34;user&#34;</span><span class=o>))</span>
</span></span><span class=line><span class=cl>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>          <span class=s>&#34;WriteUserScoreSums&#34;</span><span class=o>,</span> <span class=k>new</span> <span class=n>WriteToText</span><span class=o>&lt;&gt;(</span><span class=n>options</span><span class=o>.</span><span class=na>getOutput</span><span class=o>(),</span> <span class=n>configureOutput</span><span class=o>(),</span> <span class=kc>false</span><span class=o>));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=c1>// Run the batch pipeline.
</span></span></span><span class=line><span class=cl><span class=c1></span>  <span class=n>pipeline</span><span class=o>.</span><span class=na>run</span><span class=o>().</span><span class=na>waitUntilFinish</span><span class=o>();</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=k>def</span> <span class=nf>run</span><span class=p>(</span><span class=n>argv</span><span class=o>=</span><span class=kc>None</span><span class=p>,</span> <span class=n>save_main_session</span><span class=o>=</span><span class=kc>True</span><span class=p>):</span>
</span></span><span class=line><span class=cl>  <span class=s2>&#34;&#34;&#34;Main entry point; defines and runs the user_score pipeline.&#34;&#34;&#34;</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span> <span class=o>=</span> <span class=n>argparse</span><span class=o>.</span><span class=n>ArgumentParser</span><span class=p>()</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=c1># The default maps to two large Google Cloud Storage files (each ~12GB)</span>
</span></span><span class=line><span class=cl>  <span class=c1># holding two subsequent day&#39;s worth (roughly) of data.</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;--input&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=nb>type</span><span class=o>=</span><span class=nb>str</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>default</span><span class=o>=</span><span class=s1>&#39;gs://apache-beam-samples/game/small/gaming_data.csv&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>help</span><span class=o>=</span><span class=s1>&#39;Path to the data file(s) containing game data.&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;--output&#39;</span><span class=p>,</span> <span class=nb>type</span><span class=o>=</span><span class=nb>str</span><span class=p>,</span> <span class=n>required</span><span class=o>=</span><span class=kc>True</span><span class=p>,</span> <span class=n>help</span><span class=o>=</span><span class=s1>&#39;Path to the output file(s).&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=n>args</span><span class=p>,</span> <span class=n>pipeline_args</span> <span class=o>=</span> <span class=n>parser</span><span class=o>.</span><span class=n>parse_known_args</span><span class=p>(</span><span class=n>argv</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=n>options</span> <span class=o>=</span> <span class=n>PipelineOptions</span><span class=p>(</span><span class=n>pipeline_args</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=c1># We use the save_main_session option because one or more DoFn&#39;s in this</span>
</span></span><span class=line><span class=cl>  <span class=c1># workflow rely on global context (e.g., a module imported at module level).</span>
</span></span><span class=line><span class=cl>  <span class=n>options</span><span class=o>.</span><span class=n>view_as</span><span class=p>(</span><span class=n>SetupOptions</span><span class=p>)</span><span class=o>.</span><span class=n>save_main_session</span> <span class=o>=</span> <span class=n>save_main_session</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=k>with</span> <span class=n>beam</span><span class=o>.</span><span class=n>Pipeline</span><span class=p>(</span><span class=n>options</span><span class=o>=</span><span class=n>options</span><span class=p>)</span> <span class=k>as</span> <span class=n>p</span><span class=p>:</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=k>def</span> <span class=nf>format_user_score_sums</span><span class=p>(</span><span class=n>user_score</span><span class=p>):</span>
</span></span><span class=line><span class=cl>      <span class=p>(</span><span class=n>user</span><span class=p>,</span> <span class=n>score</span><span class=p>)</span> <span class=o>=</span> <span class=n>user_score</span>
</span></span><span class=line><span class=cl>      <span class=k>return</span> <span class=s1>&#39;user: </span><span class=si>%s</span><span class=s1>, total_score: </span><span class=si>%s</span><span class=s1>&#39;</span> <span class=o>%</span> <span class=p>(</span><span class=n>user</span><span class=p>,</span> <span class=n>score</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=p>(</span>  <span class=c1># pylint: disable=expression-not-assigned</span>
</span></span><span class=line><span class=cl>        <span class=n>p</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;ReadInputText&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromText</span><span class=p>(</span><span class=n>args</span><span class=o>.</span><span class=n>input</span><span class=p>)</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;UserScore&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>UserScore</span><span class=p>()</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;FormatUserScoreSums&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span><span class=n>format_user_score_sums</span><span class=p>)</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;WriteUserScoreSums&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>WriteToText</span><span class=p>(</span><span class=n>args</span><span class=o>.</span><span class=n>output</span><span class=p>))</span></span></span></code></pre></div></div></div><h3 id=limitations>Limitations</h3><p>As written in the example, the <code>UserScore</code> pipeline has a few limitations:</p><ul><li><p>Because some score data may be generated by offline players and sent after the daily cutoff, for game data, the result data generated by the <code>UserScore</code> pipeline <strong>may be incomplete</strong>. <code>UserScore</code> only processes the fixed input set present in the input file(s) when the pipeline runs.</p></li><li><p><code>UserScore</code> processes all data events present in the input file at processing time, and <strong>does not examine or otherwise error-check events based on event time</strong>. Therefore, the results may include some values whose event times fall outside the relevant analysis period, such as late records from the previous day.</p></li><li><p>Because <code>UserScore</code> runs only after all the data has been collected, it has <strong>high latency</strong> between when users generate data events (the event time) and when results are computed (the processing time).</p></li><li><p><code>UserScore</code> also only reports the total results for the entire day, and doesn&rsquo;t provide any finer-grained information about how the data accumulated during the day.</p></li></ul><p>Starting with the next pipeline example, we&rsquo;ll discuss how you can use Beam&rsquo;s features to address these limitations.</p><h2 id=hourlyteamscore-advanced-processing-in-batch-with-windowing>HourlyTeamScore: Advanced Processing in Batch with Windowing</h2><p>The <code>HourlyTeamScore</code> pipeline expands on the basic batch analysis principles used in the <code>UserScore</code> pipeline and improves upon some of its limitations. <code>HourlyTeamScore</code> performs finer-grained analysis, both by using additional features in the Beam SDKs, and taking into account more aspects of the game data. For example, <code>HourlyTeamScore</code> can filter out data that isn&rsquo;t part of the relevant analysis period.</p><p>Like <code>UserScore</code>, <code>HourlyTeamScore</code> is best thought of as a job to be run periodically after all the relevant data has been gathered (such as once per day). The pipeline reads a fixed data set from a file, and writes the results <span class=language-java>back to a text file</span><span class=language-py>to a Google Cloud BigQuery table</span>.</p><p class=language-java><blockquote><p><strong>Note:</strong> See <a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java>HourlyTeamScore on GitHub</a> for the complete example pipeline program.</p></blockquote></p><p class=language-py><blockquote><p><strong>Note:</strong> See <a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/hourly_team_score.py>HourlyTeamScore on GitHub</a> for the complete example pipeline program.</p></blockquote></p><h3 id=what-does-hourlyteamscore-do>What Does HourlyTeamScore Do?</h3><p><code>HourlyTeamScore</code> calculates the total score per team, per hour, in a fixed data set (such as one day&rsquo;s worth of data).</p><ul><li><p>Rather than operating on the entire data set at once, <code>HourlyTeamScore</code> divides the input data into logical windows and performs calculations on those windows. This allows <code>HourlyUserScore</code> to provide information on scoring data per window, where each window represents the game score progress at fixed intervals in time (like once every hour).</p></li><li><p><code>HourlyTeamScore</code> filters data events based on whether their event time (as indicated by the embedded timestamp) falls within the relevant analysis period. Basically, the pipeline checks each game event&rsquo;s timestamp and ensures that it falls within the range we want to analyze (in this case the day in question). Data events from previous days are discarded and not included in the score totals. This makes <code>HourlyTeamScore</code> more robust and less prone to erroneous result data than <code>UserScore</code>. It also allows the pipeline to account for late-arriving data that has a timestamp within the relevant analysis period.</p></li></ul><p>Below, we&rsquo;ll look at each of these enhancements in <code>HourlyTeamScore</code> in detail:</p><h4 id=fixed-time-windowing>Fixed-Time Windowing</h4><p>Using fixed-time windowing lets the pipeline provide better information on how events accumulated in the data set over the course of the analysis period. In our case, it tells us when in the day each team was active and how much the team scored at those times.</p><p>The following diagram shows how the pipeline processes a day&rsquo;s worth of a single team&rsquo;s scoring data after applying fixed-time windowing:</p><img src=/images/gaming-example-team-scores-narrow.gif alt="A pipeline processes score data for two teams." width=800px><p><em>Figure 3: Score data for two teams. Each team&rsquo;s scores are divided into
logical windows based on when those scores occurred in event time.</em></p><p>Notice that as processing time advances, the sums are now <em>per window</em>; each window represents an hour of <em>event time</em> during the day in which the scores occurred.</p><blockquote><p><strong>Note:</strong> As is shown in the diagram above, using windowing produces an <em>independent total for every interval</em> (in this case, each hour). <code>HourlyTeamScore</code> doesn&rsquo;t provide a running total for the entire data set at each hour&ndash;it provides the total score for all the events that occurred <em>only within that hour</em>.</p></blockquote><p>Beam&rsquo;s windowing feature uses the <a href=/documentation/programming-guide/#element-timestamps>intrinsic timestamp information</a> attached to each element of a <code>PCollection</code>. Because we want our pipeline to window based on <em>event time</em>, we <strong>must first extract the timestamp</strong> that&rsquo;s embedded in each data record apply it to the corresponding element in the <code>PCollection</code> of score data. Then, the pipeline can <strong>apply the windowing function</strong> to divide the <code>PCollection</code> into logical windows.</p><p class=language-java><code>HourlyTeamScore</code> uses the <a href=https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java>WithTimestamps</a> and <a href=https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java>Window</a> transforms to perform these operations.</p><p class=language-py><code>HourlyTeamScore</code> uses the <code>FixedWindows</code> transform, found in <a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/transforms/window.py>window.py</a>, to perform these operations.</p><p>The following code shows this:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// Add an element timestamp based on the event log, and apply fixed windowing.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>    <span class=s>&#34;AddEventTimestamps&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>    <span class=n>WithTimestamps</span><span class=o>.</span><span class=na>of</span><span class=o>((</span><span class=n>GameActionInfo</span> <span class=n>i</span><span class=o>)</span> <span class=o>-&gt;</span> <span class=k>new</span> <span class=n>Instant</span><span class=o>(</span><span class=n>i</span><span class=o>.</span><span class=na>getTimestamp</span><span class=o>())))</span>
</span></span><span class=line><span class=cl><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>    <span class=s>&#34;FixedWindowsTeam&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>    <span class=n>Window</span><span class=o>.</span><span class=na>into</span><span class=o>(</span><span class=n>FixedWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardMinutes</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getWindowDuration</span><span class=o>()))))</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># Add an element timestamp based on the event log, and apply fixed</span>
</span></span><span class=line><span class=cl><span class=c1># windowing.</span>
</span></span><span class=line><span class=cl><span class=o>|</span> <span class=s1>&#39;AddEventTimestamps&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span>
</span></span><span class=line><span class=cl>    <span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>TimestampedValue</span><span class=p>(</span><span class=n>elem</span><span class=p>,</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;timestamp&#39;</span><span class=p>]))</span>
</span></span><span class=line><span class=cl><span class=o>|</span> <span class=s1>&#39;FixedWindowsTeam&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span>
</span></span><span class=line><span class=cl>    <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>FixedWindows</span><span class=p>(</span><span class=bp>self</span><span class=o>.</span><span class=n>window_duration_in_seconds</span><span class=p>))</span></span></span></code></pre></div></div></div><p>Notice that the transforms the pipeline uses to specify the windowing are distinct from the actual data processing transforms (such as <code>ExtractAndSumScores</code>). This functionality provides you some flexibility in designing your Beam pipeline, in that you can run existing transforms over datasets with different windowing characteristics.</p><h4 id=filtering-based-on-event-time>Filtering Based On Event Time</h4><p><code>HourlyTeamScore</code> uses <strong>filtering</strong> to remove any events from our dataset whose timestamps don&rsquo;t fall within the relevant analysis period (i.e. they weren&rsquo;t generated during the day that we&rsquo;re interested in). This keeps the pipeline from erroneously including any data that was, for example, generated offline during the previous day but sent to the game server during the current day.</p><p>It also lets the pipeline include relevant <strong>late data</strong>—data events with valid timestamps, but that arrived after our analysis period ended. If our pipeline cutoff time is 12:00 am, for example, we might run the pipeline at 2:00 am, but filter out any events whose timestamps indicate that they occurred after the 12:00 am cutoff. Data events that were delayed and arrived between 12:01 am and 2:00 am, but whose timestamps indicate that they occurred before the 12:00 am cutoff, would be included in the pipeline processing.</p><p><code>HourlyTeamScore</code> uses the <code>Filter</code> transform to perform this operation. When you apply <code>Filter</code>, you specify a predicate to which each data record is compared. Data records that pass the comparison are included, while events that fail the comparison are excluded. In our case, the predicate is the cut-off time we specify, and we compare just one part of the data—the timestamp field.</p><p>The following code shows how <code>HourlyTeamScore</code> uses the <code>Filter</code> transform to filter events that occur either before or after the relevant analysis period:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>    <span class=s>&#34;FilterStartTime&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>    <span class=n>Filter</span><span class=o>.</span><span class=na>by</span><span class=o>(</span>
</span></span><span class=line><span class=cl>        <span class=o>(</span><span class=n>GameActionInfo</span> <span class=n>gInfo</span><span class=o>)</span> <span class=o>-&gt;</span> <span class=n>gInfo</span><span class=o>.</span><span class=na>getTimestamp</span><span class=o>()</span> <span class=o>&gt;</span> <span class=n>startMinTimestamp</span><span class=o>.</span><span class=na>getMillis</span><span class=o>()))</span>
</span></span><span class=line><span class=cl><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>    <span class=s>&#34;FilterEndTime&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>    <span class=n>Filter</span><span class=o>.</span><span class=na>by</span><span class=o>(</span>
</span></span><span class=line><span class=cl>        <span class=o>(</span><span class=n>GameActionInfo</span> <span class=n>gInfo</span><span class=o>)</span> <span class=o>-&gt;</span> <span class=n>gInfo</span><span class=o>.</span><span class=na>getTimestamp</span><span class=o>()</span> <span class=o>&lt;</span> <span class=n>stopMinTimestamp</span><span class=o>.</span><span class=na>getMillis</span><span class=o>()))</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=o>|</span> <span class=s1>&#39;FilterStartTime&#39;</span> <span class=o>&gt;&gt;</span>
</span></span><span class=line><span class=cl><span class=n>beam</span><span class=o>.</span><span class=n>Filter</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;timestamp&#39;</span><span class=p>]</span> <span class=o>&gt;</span> <span class=bp>self</span><span class=o>.</span><span class=n>start_timestamp</span><span class=p>)</span>
</span></span><span class=line><span class=cl><span class=o>|</span> <span class=s1>&#39;FilterEndTime&#39;</span> <span class=o>&gt;&gt;</span>
</span></span><span class=line><span class=cl><span class=n>beam</span><span class=o>.</span><span class=n>Filter</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;timestamp&#39;</span><span class=p>]</span> <span class=o>&lt;</span> <span class=bp>self</span><span class=o>.</span><span class=n>stop_timestamp</span><span class=p>)</span></span></span></code></pre></div></div></div><h4 id=calculating-score-per-team-per-window>Calculating Score Per Team, Per Window</h4><p><code>HourlyTeamScore</code> uses the same <code>ExtractAndSumScores</code> transform as the <code>UserScore</code> pipeline, but passes a different key (team, as opposed to user). Also, because the pipeline applies <code>ExtractAndSumScores</code> <em>after</em> applying fixed-time 1-hour windowing to the input data, the data gets grouped by both team <em>and</em> window. You can see the full sequence of transforms in <code>HourlyTeamScore</code>&rsquo;s main method:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kt>void</span> <span class=nf>main</span><span class=o>(</span><span class=n>String</span><span class=o>[]</span> <span class=n>args</span><span class=o>)</span> <span class=kd>throws</span> <span class=n>Exception</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>  <span class=c1>// Begin constructing a pipeline configured by commandline flags.
</span></span></span><span class=line><span class=cl><span class=c1></span>  <span class=n>Options</span> <span class=n>options</span> <span class=o>=</span> <span class=n>PipelineOptionsFactory</span><span class=o>.</span><span class=na>fromArgs</span><span class=o>(</span><span class=n>args</span><span class=o>).</span><span class=na>withValidation</span><span class=o>().</span><span class=na>as</span><span class=o>(</span><span class=n>Options</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl>  <span class=n>Pipeline</span> <span class=n>pipeline</span> <span class=o>=</span> <span class=n>Pipeline</span><span class=o>.</span><span class=na>create</span><span class=o>(</span><span class=n>options</span><span class=o>);</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=kd>final</span> <span class=n>Instant</span> <span class=n>stopMinTimestamp</span> <span class=o>=</span> <span class=k>new</span> <span class=n>Instant</span><span class=o>(</span><span class=n>minFmt</span><span class=o>.</span><span class=na>parseMillis</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getStopMin</span><span class=o>()));</span>
</span></span><span class=line><span class=cl>  <span class=kd>final</span> <span class=n>Instant</span> <span class=n>startMinTimestamp</span> <span class=o>=</span> <span class=k>new</span> <span class=n>Instant</span><span class=o>(</span><span class=n>minFmt</span><span class=o>.</span><span class=na>parseMillis</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getStartMin</span><span class=o>()));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=c1>// Read &#39;gaming&#39; events from a text file.
</span></span></span><span class=line><span class=cl><span class=c1></span>  <span class=n>pipeline</span>
</span></span><span class=line><span class=cl>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>TextIO</span><span class=o>.</span><span class=na>read</span><span class=o>().</span><span class=na>from</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getInput</span><span class=o>()))</span>
</span></span><span class=line><span class=cl>      <span class=c1>// Parse the incoming data.
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;ParseGameEvent&#34;</span><span class=o>,</span> <span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>ParseEventFn</span><span class=o>()))</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>      <span class=c1>// Filter out data before and after the given times so that it is not included
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=c1>// in the calculations. As we collect data in batches (say, by day), the batch for the day
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=c1>// that we want to analyze could potentially include some late-arriving data from the
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=c1>// previous day.
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=c1>// If so, we want to weed it out. Similarly, if we include data from the following day
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=c1>// (to scoop up late-arriving events from the day we&#39;re analyzing), we need to weed out
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=c1>// events that fall after the time period we want to analyze.
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=c1>// [START DocInclude_HTSFilters]
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>          <span class=s>&#34;FilterStartTime&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>          <span class=n>Filter</span><span class=o>.</span><span class=na>by</span><span class=o>(</span>
</span></span><span class=line><span class=cl>              <span class=o>(</span><span class=n>GameActionInfo</span> <span class=n>gInfo</span><span class=o>)</span> <span class=o>-&gt;</span> <span class=n>gInfo</span><span class=o>.</span><span class=na>getTimestamp</span><span class=o>()</span> <span class=o>&gt;</span> <span class=n>startMinTimestamp</span><span class=o>.</span><span class=na>getMillis</span><span class=o>()))</span>
</span></span><span class=line><span class=cl>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>          <span class=s>&#34;FilterEndTime&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>          <span class=n>Filter</span><span class=o>.</span><span class=na>by</span><span class=o>(</span>
</span></span><span class=line><span class=cl>              <span class=o>(</span><span class=n>GameActionInfo</span> <span class=n>gInfo</span><span class=o>)</span> <span class=o>-&gt;</span> <span class=n>gInfo</span><span class=o>.</span><span class=na>getTimestamp</span><span class=o>()</span> <span class=o>&lt;</span> <span class=n>stopMinTimestamp</span><span class=o>.</span><span class=na>getMillis</span><span class=o>()))</span>
</span></span><span class=line><span class=cl>      <span class=c1>// [END DocInclude_HTSFilters]
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl>      <span class=c1>// [START DocInclude_HTSAddTsAndWindow]
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=c1>// Add an element timestamp based on the event log, and apply fixed windowing.
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>          <span class=s>&#34;AddEventTimestamps&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>          <span class=n>WithTimestamps</span><span class=o>.</span><span class=na>of</span><span class=o>((</span><span class=n>GameActionInfo</span> <span class=n>i</span><span class=o>)</span> <span class=o>-&gt;</span> <span class=k>new</span> <span class=n>Instant</span><span class=o>(</span><span class=n>i</span><span class=o>.</span><span class=na>getTimestamp</span><span class=o>())))</span>
</span></span><span class=line><span class=cl>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>          <span class=s>&#34;FixedWindowsTeam&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>          <span class=n>Window</span><span class=o>.</span><span class=na>into</span><span class=o>(</span><span class=n>FixedWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardMinutes</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getWindowDuration</span><span class=o>()))))</span>
</span></span><span class=line><span class=cl>      <span class=c1>// [END DocInclude_HTSAddTsAndWindow]
</span></span></span><span class=line><span class=cl><span class=c1></span>
</span></span><span class=line><span class=cl>      <span class=c1>// Extract and sum teamname/score pairs from the event data.
</span></span></span><span class=line><span class=cl><span class=c1></span>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;ExtractTeamScore&#34;</span><span class=o>,</span> <span class=k>new</span> <span class=n>ExtractAndSumScore</span><span class=o>(</span><span class=s>&#34;team&#34;</span><span class=o>))</span>
</span></span><span class=line><span class=cl>      <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>          <span class=s>&#34;WriteTeamScoreSums&#34;</span><span class=o>,</span> <span class=k>new</span> <span class=n>WriteToText</span><span class=o>&lt;&gt;(</span><span class=n>options</span><span class=o>.</span><span class=na>getOutput</span><span class=o>(),</span> <span class=n>configureOutput</span><span class=o>(),</span> <span class=kc>true</span><span class=o>));</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=n>pipeline</span><span class=o>.</span><span class=na>run</span><span class=o>().</span><span class=na>waitUntilFinish</span><span class=o>();</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=k>class</span> <span class=nc>HourlyTeamScore</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=p>):</span>
</span></span><span class=line><span class=cl>  <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>start_min</span><span class=p>,</span> <span class=n>stop_min</span><span class=p>,</span> <span class=n>window_duration</span><span class=p>):</span>
</span></span><span class=line><span class=cl>    <span class=c1># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span>
</span></span><span class=line><span class=cl>    <span class=c1># super().__init__()</span>
</span></span><span class=line><span class=cl>    <span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=o>.</span><span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>)</span>
</span></span><span class=line><span class=cl>    <span class=bp>self</span><span class=o>.</span><span class=n>start_timestamp</span> <span class=o>=</span> <span class=n>str2timestamp</span><span class=p>(</span><span class=n>start_min</span><span class=p>)</span>
</span></span><span class=line><span class=cl>    <span class=bp>self</span><span class=o>.</span><span class=n>stop_timestamp</span> <span class=o>=</span> <span class=n>str2timestamp</span><span class=p>(</span><span class=n>stop_min</span><span class=p>)</span>
</span></span><span class=line><span class=cl>    <span class=bp>self</span><span class=o>.</span><span class=n>window_duration_in_seconds</span> <span class=o>=</span> <span class=n>window_duration</span> <span class=o>*</span> <span class=mi>60</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=k>def</span> <span class=nf>expand</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>pcoll</span><span class=p>):</span>
</span></span><span class=line><span class=cl>    <span class=k>return</span> <span class=p>(</span>
</span></span><span class=line><span class=cl>        <span class=n>pcoll</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;ParseGameEventFn&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>ParseGameEventFn</span><span class=p>())</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>        <span class=c1># Filter out data before and after the given times so that it is not</span>
</span></span><span class=line><span class=cl>        <span class=c1># included in the calculations. As we collect data in batches (say, by</span>
</span></span><span class=line><span class=cl>        <span class=c1># day), the batch for the day that we want to analyze could potentially</span>
</span></span><span class=line><span class=cl>        <span class=c1># include some late-arriving data from the previous day. If so, we want</span>
</span></span><span class=line><span class=cl>        <span class=c1># to weed it out. Similarly, if we include data from the following day</span>
</span></span><span class=line><span class=cl>        <span class=c1># (to scoop up late-arriving events from the day we&#39;re analyzing), we</span>
</span></span><span class=line><span class=cl>        <span class=c1># need to weed out events that fall after the time period we want to</span>
</span></span><span class=line><span class=cl>        <span class=c1># analyze.</span>
</span></span><span class=line><span class=cl>        <span class=c1># [START filter_by_time_range]</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;FilterStartTime&#39;</span> <span class=o>&gt;&gt;</span>
</span></span><span class=line><span class=cl>        <span class=n>beam</span><span class=o>.</span><span class=n>Filter</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;timestamp&#39;</span><span class=p>]</span> <span class=o>&gt;</span> <span class=bp>self</span><span class=o>.</span><span class=n>start_timestamp</span><span class=p>)</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;FilterEndTime&#39;</span> <span class=o>&gt;&gt;</span>
</span></span><span class=line><span class=cl>        <span class=n>beam</span><span class=o>.</span><span class=n>Filter</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;timestamp&#39;</span><span class=p>]</span> <span class=o>&lt;</span> <span class=bp>self</span><span class=o>.</span><span class=n>stop_timestamp</span><span class=p>)</span>
</span></span><span class=line><span class=cl>        <span class=c1># [END filter_by_time_range]</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>        <span class=c1># [START add_timestamp_and_window]</span>
</span></span><span class=line><span class=cl>        <span class=c1># Add an element timestamp based on the event log, and apply fixed</span>
</span></span><span class=line><span class=cl>        <span class=c1># windowing.</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;AddEventTimestamps&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span>
</span></span><span class=line><span class=cl>            <span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>TimestampedValue</span><span class=p>(</span><span class=n>elem</span><span class=p>,</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;timestamp&#39;</span><span class=p>]))</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;FixedWindowsTeam&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span>
</span></span><span class=line><span class=cl>            <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>FixedWindows</span><span class=p>(</span><span class=bp>self</span><span class=o>.</span><span class=n>window_duration_in_seconds</span><span class=p>))</span>
</span></span><span class=line><span class=cl>        <span class=c1># [END add_timestamp_and_window]</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>        <span class=c1># Extract and sum teamname/score pairs from the event data.</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;ExtractAndSumScore&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>ExtractAndSumScore</span><span class=p>(</span><span class=s1>&#39;team&#39;</span><span class=p>))</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=k>def</span> <span class=nf>run</span><span class=p>(</span><span class=n>argv</span><span class=o>=</span><span class=kc>None</span><span class=p>,</span> <span class=n>save_main_session</span><span class=o>=</span><span class=kc>True</span><span class=p>):</span>
</span></span><span class=line><span class=cl>  <span class=s2>&#34;&#34;&#34;Main entry point; defines and runs the hourly_team_score pipeline.&#34;&#34;&#34;</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span> <span class=o>=</span> <span class=n>argparse</span><span class=o>.</span><span class=n>ArgumentParser</span><span class=p>()</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=c1># The default maps to two large Google Cloud Storage files (each ~12GB)</span>
</span></span><span class=line><span class=cl>  <span class=c1># holding two subsequent day&#39;s worth (roughly) of data.</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;--input&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=nb>type</span><span class=o>=</span><span class=nb>str</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>default</span><span class=o>=</span><span class=s1>&#39;gs://apache-beam-samples/game/gaming_data*.csv&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>help</span><span class=o>=</span><span class=s1>&#39;Path to the data file(s) containing game data.&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;--dataset&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=nb>type</span><span class=o>=</span><span class=nb>str</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>required</span><span class=o>=</span><span class=kc>True</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>help</span><span class=o>=</span><span class=s1>&#39;BigQuery Dataset to write tables to. &#39;</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;Must already exist.&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;--table_name&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>default</span><span class=o>=</span><span class=s1>&#39;leader_board&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>help</span><span class=o>=</span><span class=s1>&#39;The BigQuery table name. Should not already exist.&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;--window_duration&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=nb>type</span><span class=o>=</span><span class=nb>int</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>default</span><span class=o>=</span><span class=mi>60</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>help</span><span class=o>=</span><span class=s1>&#39;Numeric value of fixed window duration, in minutes&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;--start_min&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=nb>type</span><span class=o>=</span><span class=nb>str</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>default</span><span class=o>=</span><span class=s1>&#39;1970-01-01-00-00&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>help</span><span class=o>=</span><span class=s1>&#39;String representation of the first minute after &#39;</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;which to generate results in the format: &#39;</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;yyyy-MM-dd-HH-mm. Any input data timestamped &#39;</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;prior to that minute won</span><span class=se>\&#39;</span><span class=s1>t be included in the &#39;</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;sums.&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl>  <span class=n>parser</span><span class=o>.</span><span class=n>add_argument</span><span class=p>(</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;--stop_min&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=nb>type</span><span class=o>=</span><span class=nb>str</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>default</span><span class=o>=</span><span class=s1>&#39;2100-01-01-00-00&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>      <span class=n>help</span><span class=o>=</span><span class=s1>&#39;String representation of the first minute for &#39;</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;which to generate results in the format: &#39;</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;yyyy-MM-dd-HH-mm. Any input data timestamped &#39;</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;after to that minute won</span><span class=se>\&#39;</span><span class=s1>t be included in the &#39;</span>
</span></span><span class=line><span class=cl>      <span class=s1>&#39;sums.&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=n>args</span><span class=p>,</span> <span class=n>pipeline_args</span> <span class=o>=</span> <span class=n>parser</span><span class=o>.</span><span class=n>parse_known_args</span><span class=p>(</span><span class=n>argv</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=n>options</span> <span class=o>=</span> <span class=n>PipelineOptions</span><span class=p>(</span><span class=n>pipeline_args</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=c1># We also require the --project option to access --dataset</span>
</span></span><span class=line><span class=cl>  <span class=k>if</span> <span class=n>options</span><span class=o>.</span><span class=n>view_as</span><span class=p>(</span><span class=n>GoogleCloudOptions</span><span class=p>)</span><span class=o>.</span><span class=n>project</span> <span class=ow>is</span> <span class=kc>None</span><span class=p>:</span>
</span></span><span class=line><span class=cl>    <span class=n>parser</span><span class=o>.</span><span class=n>print_usage</span><span class=p>()</span>
</span></span><span class=line><span class=cl>    <span class=nb>print</span><span class=p>(</span><span class=n>sys</span><span class=o>.</span><span class=n>argv</span><span class=p>[</span><span class=mi>0</span><span class=p>]</span> <span class=o>+</span> <span class=s1>&#39;: error: argument --project is required&#39;</span><span class=p>)</span>
</span></span><span class=line><span class=cl>    <span class=n>sys</span><span class=o>.</span><span class=n>exit</span><span class=p>(</span><span class=mi>1</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=c1># We use the save_main_session option because one or more DoFn&#39;s in this</span>
</span></span><span class=line><span class=cl>  <span class=c1># workflow rely on global context (e.g., a module imported at module level).</span>
</span></span><span class=line><span class=cl>  <span class=n>options</span><span class=o>.</span><span class=n>view_as</span><span class=p>(</span><span class=n>SetupOptions</span><span class=p>)</span><span class=o>.</span><span class=n>save_main_session</span> <span class=o>=</span> <span class=n>save_main_session</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=k>with</span> <span class=n>beam</span><span class=o>.</span><span class=n>Pipeline</span><span class=p>(</span><span class=n>options</span><span class=o>=</span><span class=n>options</span><span class=p>)</span> <span class=k>as</span> <span class=n>p</span><span class=p>:</span>
</span></span><span class=line><span class=cl>    <span class=p>(</span>  <span class=c1># pylint: disable=expression-not-assigned</span>
</span></span><span class=line><span class=cl>        <span class=n>p</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;ReadInputText&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>io</span><span class=o>.</span><span class=n>ReadFromText</span><span class=p>(</span><span class=n>args</span><span class=o>.</span><span class=n>input</span><span class=p>)</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;HourlyTeamScore&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>HourlyTeamScore</span><span class=p>(</span>
</span></span><span class=line><span class=cl>            <span class=n>args</span><span class=o>.</span><span class=n>start_min</span><span class=p>,</span> <span class=n>args</span><span class=o>.</span><span class=n>stop_min</span><span class=p>,</span> <span class=n>args</span><span class=o>.</span><span class=n>window_duration</span><span class=p>)</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;TeamScoresDict&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>TeamScoresDict</span><span class=p>())</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;WriteTeamScoreSums&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>WriteToBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl>            <span class=n>args</span><span class=o>.</span><span class=n>table_name</span><span class=p>,</span>
</span></span><span class=line><span class=cl>            <span class=n>args</span><span class=o>.</span><span class=n>dataset</span><span class=p>,</span>
</span></span><span class=line><span class=cl>            <span class=p>{</span>
</span></span><span class=line><span class=cl>                <span class=s1>&#39;team&#39;</span><span class=p>:</span> <span class=s1>&#39;STRING&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>                <span class=s1>&#39;total_score&#39;</span><span class=p>:</span> <span class=s1>&#39;INTEGER&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>                <span class=s1>&#39;window_start&#39;</span><span class=p>:</span> <span class=s1>&#39;STRING&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>            <span class=p>},</span>
</span></span><span class=line><span class=cl>            <span class=n>options</span><span class=o>.</span><span class=n>view_as</span><span class=p>(</span><span class=n>GoogleCloudOptions</span><span class=p>)</span><span class=o>.</span><span class=n>project</span><span class=p>))</span></span></span></code></pre></div></div></div><h3 id=limitations-1>Limitations</h3><p>As written, <code>HourlyTeamScore</code> still has a limitation:</p><ul><li><code>HourlyTeamScore</code> still has <strong>high latency</strong> between when data events occur (the event time) and when results are generated (the processing time), because, as a batch pipeline, it needs to wait to begin processing until all data events are present.</li></ul><h2 id=leaderboard-streaming-processing-with-real-time-game-data>LeaderBoard: Streaming Processing with Real-Time Game Data</h2><p>One way we can help address the latency issue present in the <code>UserScore</code> and <code>HourlyTeamScore</code> pipelines is by reading the score data from an unbounded source. The <code>LeaderBoard</code> pipeline introduces streaming processing by reading the game score data from an unbounded source that produces an infinite amount of data, rather than from a file on the game server.</p><p>The <code>LeaderBoard</code> pipeline also demonstrates how to process game score data with respect to both <em>processing time</em> and <em>event time</em>. <code>LeaderBoard</code> outputs data about both individual user scores and about team scores, each with respect to a different time frame.</p><p>Because the <code>LeaderBoard</code> pipeline reads the game data from an unbounded source as that data is generated, you can think of the pipeline as an ongoing job running concurrently with the game process. <code>LeaderBoard</code> can thus provide low-latency insights into how users are playing the game at any given moment — useful if, for example, we want to provide a live web-based scoreboard so that users can track their progress against other users as they play.</p><p class=language-java><blockquote><p><strong>Note:</strong> See <a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java>LeaderBoard on GitHub</a> for the complete example pipeline program.</p></blockquote></p><p class=language-py><blockquote><p><strong>Note:</strong> See <a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/leader_board.py>LeaderBoard on GitHub</a> for the complete example pipeline program.</p></blockquote></p><h3 id=what-does-leaderboard-do>What Does LeaderBoard Do?</h3><p>The <code>LeaderBoard</code> pipeline reads game data published to an unbounded source that produces an infinite amount of data in near real-time, and uses that data to perform two separate processing tasks:</p><ul><li><p><code>LeaderBoard</code> calculates the total score for every unique user and publishes speculative results for every ten minutes of <em>processing time</em>. That is, ten minutes after data is received, the pipeline outputs the total score per user that the pipeline has processed to date. This calculation provides a running &ldquo;leader board&rdquo; in close to real time, regardless of when the actual game events were generated.</p></li><li><p><code>LeaderBoard</code> calculates the team scores for each hour that the pipeline runs. This is useful if we want to, for example, reward the top-scoring team for each hour of play. The team score calculation uses fixed-time windowing to divide the input data into hour-long finite windows based on the <em>event time</em> (indicated by the timestamp) as data arrives in the pipeline.</p><p>In addition, the team score calculation uses Beam&rsquo;s trigger mechanisms to provide speculative results for each hour (which update every five minutes until the hour is up), and to also capture any late data and add it to the specific hour-long window to which it belongs.</p></li></ul><p>Below, we&rsquo;ll look at both of these tasks in detail.</p><h4 id=calculating-user-score-based-on-processing-time>Calculating User Score based on Processing Time</h4><p>We want our pipeline to output a running total score for each user for every ten minutes of processing time. This calculation doesn&rsquo;t consider <em>when</em> the actual score was generated by the user&rsquo;s play instance; it simply outputs the sum of all the scores for that user that have arrived in the pipeline to date. Late data gets included in the calculation whenever it happens to arrive in the pipeline as it&rsquo;s running.</p><p>Because we want all the data that has arrived in the pipeline every time we update our calculation, we have the pipeline consider all of the user score data in a <strong>single global window</strong>. The single global window is unbounded, but we can specify a kind of temporary cut-off point for each ten-minute calculation by using a processing time <a href=/documentation/programming-guide/#triggers>trigger</a>.</p><p>When we specify a ten-minute processing time trigger for the single global window, the pipeline effectively takes a &ldquo;snapshot&rdquo; of the contents of the window every time the trigger fires. This snapshot happens after ten minutes have passed since data was received. If no data has arrived, the pipeline takes its next &ldquo;snapshot&rdquo; 10 minutes after an element arrives. Since we&rsquo;re using a single global window, each snapshot contains all the data collected <em>to that point in time</em>. The following diagram shows the effects of using a processing time trigger on the single global window:</p><img src=/images/gaming-example-proc-time-narrow.gif alt="A pipeline processes score data for three users." width=850px><p><em>Figure 4: Score data for three users. Each user&rsquo;s scores are grouped together
in a single global window, with a trigger that generates a snapshot for output
ten minutes after data is received.</em></p><p>As processing time advances and more scores are processed, the trigger outputs the updated sum for each user.</p><p>The following code example shows how <code>LeaderBoard</code> sets the processing time trigger to output the data for user scores:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=cm>/**
</span></span></span><span class=line><span class=cl><span class=cm> * Extract user/score pairs from the event stream using processing time, via global windowing. Get
</span></span></span><span class=line><span class=cl><span class=cm> * periodic updates on all users&#39; running scores.
</span></span></span><span class=line><span class=cl><span class=cm> */</span>
</span></span><span class=line><span class=cl><span class=nd>@VisibleForTesting</span>
</span></span><span class=line><span class=cl><span class=kd>static</span> <span class=kd>class</span> <span class=nc>CalculateUserScores</span>
</span></span><span class=line><span class=cl>    <span class=kd>extends</span> <span class=n>PTransform</span><span class=o>&lt;</span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>GameActionInfo</span><span class=o>&gt;,</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;&gt;</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>  <span class=kd>private</span> <span class=kd>final</span> <span class=n>Duration</span> <span class=n>allowedLateness</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=n>CalculateUserScores</span><span class=o>(</span><span class=n>Duration</span> <span class=n>allowedLateness</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>    <span class=k>this</span><span class=o>.</span><span class=na>allowedLateness</span> <span class=o>=</span> <span class=n>allowedLateness</span><span class=o>;</span>
</span></span><span class=line><span class=cl>  <span class=o>}</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=nd>@Override</span>
</span></span><span class=line><span class=cl>  <span class=kd>public</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span> <span class=nf>expand</span><span class=o>(</span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>GameActionInfo</span><span class=o>&gt;</span> <span class=n>input</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>    <span class=k>return</span> <span class=n>input</span>
</span></span><span class=line><span class=cl>        <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>            <span class=s>&#34;LeaderboardUserGlobalWindow&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>            <span class=n>Window</span><span class=o>.&lt;</span><span class=n>GameActionInfo</span><span class=o>&gt;</span><span class=n>into</span><span class=o>(</span><span class=k>new</span> <span class=n>GlobalWindows</span><span class=o>())</span>
</span></span><span class=line><span class=cl>                <span class=c1>// Get periodic results every ten minutes.
</span></span></span><span class=line><span class=cl><span class=c1></span>                <span class=o>.</span><span class=na>triggering</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                    <span class=n>Repeatedly</span><span class=o>.</span><span class=na>forever</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                        <span class=n>AfterProcessingTime</span><span class=o>.</span><span class=na>pastFirstElementInPane</span><span class=o>().</span><span class=na>plusDelayOf</span><span class=o>(</span><span class=n>TEN_MINUTES</span><span class=o>)))</span>
</span></span><span class=line><span class=cl>                <span class=o>.</span><span class=na>accumulatingFiredPanes</span><span class=o>()</span>
</span></span><span class=line><span class=cl>                <span class=o>.</span><span class=na>withAllowedLateness</span><span class=o>(</span><span class=n>allowedLateness</span><span class=o>))</span>
</span></span><span class=line><span class=cl>        <span class=c1>// Extract and sum username/score pairs from the event data.
</span></span></span><span class=line><span class=cl><span class=c1></span>        <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;ExtractUserScore&#34;</span><span class=o>,</span> <span class=k>new</span> <span class=n>ExtractAndSumScore</span><span class=o>(</span><span class=s>&#34;user&#34;</span><span class=o>));</span>
</span></span><span class=line><span class=cl>  <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=k>class</span> <span class=nc>CalculateUserScores</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=p>):</span>
</span></span><span class=line><span class=cl>  <span class=s2>&#34;&#34;&#34;Extract user/score pairs from the event stream using processing time, via
</span></span></span><span class=line><span class=cl><span class=s2>  global windowing. Get periodic updates on all users&#39; running scores.
</span></span></span><span class=line><span class=cl><span class=s2>  &#34;&#34;&#34;</span>
</span></span><span class=line><span class=cl>  <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>allowed_lateness</span><span class=p>):</span>
</span></span><span class=line><span class=cl>    <span class=c1># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span>
</span></span><span class=line><span class=cl>    <span class=c1># super().__init__()</span>
</span></span><span class=line><span class=cl>    <span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=o>.</span><span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>)</span>
</span></span><span class=line><span class=cl>    <span class=bp>self</span><span class=o>.</span><span class=n>allowed_lateness_seconds</span> <span class=o>=</span> <span class=n>allowed_lateness</span> <span class=o>*</span> <span class=mi>60</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=k>def</span> <span class=nf>expand</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>pcoll</span><span class=p>):</span>
</span></span><span class=line><span class=cl>    <span class=c1># NOTE: the behavior does not exactly match the Java example</span>
</span></span><span class=line><span class=cl>    <span class=c1># TODO: allowed_lateness not implemented yet in FixedWindows</span>
</span></span><span class=line><span class=cl>    <span class=c1># TODO: AfterProcessingTime not implemented yet, replace AfterCount</span>
</span></span><span class=line><span class=cl>    <span class=k>return</span> <span class=p>(</span>
</span></span><span class=line><span class=cl>        <span class=n>pcoll</span>
</span></span><span class=line><span class=cl>        <span class=c1># Get periodic results every ten events.</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;LeaderboardUserGlobalWindows&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span>
</span></span><span class=line><span class=cl>            <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>GlobalWindows</span><span class=p>(),</span>
</span></span><span class=line><span class=cl>            <span class=n>trigger</span><span class=o>=</span><span class=n>trigger</span><span class=o>.</span><span class=n>Repeatedly</span><span class=p>(</span><span class=n>trigger</span><span class=o>.</span><span class=n>AfterCount</span><span class=p>(</span><span class=mi>10</span><span class=p>)),</span>
</span></span><span class=line><span class=cl>            <span class=n>accumulation_mode</span><span class=o>=</span><span class=n>trigger</span><span class=o>.</span><span class=n>AccumulationMode</span><span class=o>.</span><span class=n>ACCUMULATING</span><span class=p>)</span>
</span></span><span class=line><span class=cl>        <span class=c1># Extract and sum username/score pairs from the event data.</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;ExtractAndSumScore&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>ExtractAndSumScore</span><span class=p>(</span><span class=s1>&#39;user&#39;</span><span class=p>))</span></span></span></code></pre></div></div></div><p><code>LeaderBoard</code> sets the <a href=/documentation/programming-guide/#window-accumulation-modes>window accumulation mode</a> to accumulate window panes as the trigger fires. This accumulation mode is set by <span class=language-java>invoking <code>.accumulatingFiredPanes</code></span> <span class=language-py>using <code>accumulation_mode=trigger.AccumulationMode.ACCUMULATING</code></span> when setting the trigger, and causes the pipeline to accumulate the previously emitted data together with any new data that&rsquo;s arrived since the last trigger fire. This ensures that <code>LeaderBoard</code> is a running sum for the user scores, rather than a collection of individual sums.</p><h4 id=calculating-team-score-based-on-event-time>Calculating Team Score based on Event Time</h4><p>We want our pipeline to also output the total score for each team during each hour of play. Unlike the user score calculation, for team scores, we care about when in <em>event</em> time each score actually occurred, because we want to consider each hour of play individually. We also want to provide speculative updates as each individual hour progresses, and to allow any instances of late data — data that arrives after a given hour&rsquo;s data is considered complete — to be included in our calculation.</p><p>Because we consider each hour individually, we can apply fixed-time windowing to our input data, just like in <code>HourlyTeamScore</code>. To provide the speculative updates and updates on late data, we&rsquo;ll specify additional trigger parameters. The trigger will cause each window to calculate and emit results at an interval we specify (in this case, every five minutes), and also to keep triggering after the window is considered &ldquo;complete&rdquo; to account for late data. Just like the user score calculation, we&rsquo;ll set the trigger to accumulating mode to ensure that we get a running sum for each hour-long window.</p><p>The triggers for speculative updates and late data help with the problem of <a href=/documentation/programming-guide/#windowing>time skew</a>. Events in the pipeline aren&rsquo;t necessarily processed in the order in which they actually occurred according to their timestamps; they may arrive in the pipeline out of order, or late (in our case, because they were generated while the user&rsquo;s phone was out of contact with a network). Beam needs a way to determine when it can reasonably assume that it has &ldquo;all&rdquo; of the data in a given window: this is called the <em>watermark</em>.</p><p>In an ideal world, all data would be processed immediately when it occurs, so the processing time would be equal to (or at least have a linear relationship to) the event time. However, because distributed systems contain some inherent inaccuracy (like our late-reporting phones), Beam often uses a heuristic watermark.</p><p>The following diagram shows the relationship between ongoing processing time and each score&rsquo;s event time for two teams:</p><img src=/images/gaming-example-event-time-narrow.gif alt="A pipeline processes score data by team, windowed by event time." width=800px><p><em>Figure 5: Score data by team, windowed by event time. A trigger based on
processing time causes the window to emit speculative early results and include
late results.</em></p><p>The dotted line in the diagram is the &ldquo;ideal&rdquo; <strong>watermark</strong>: Beam&rsquo;s notion of when all data in a given window can reasonably be considered to have arrived. The irregular solid line represents the actual watermark, as determined by the data source.</p><p>Data arriving above the solid watermark line is <em>late data</em> — this is a score event that was delayed (perhaps generated offline) and arrived after the window to which it belongs had closed. Our pipeline&rsquo;s late-firing trigger ensures that this late data is still included in the sum.</p><p>The following code example shows how <code>LeaderBoard</code> applies fixed-time windowing with the appropriate triggers to have our pipeline perform the calculations we want:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// Extract team/score pairs from the event stream, using hour-long windows by default.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=nd>@VisibleForTesting</span>
</span></span><span class=line><span class=cl><span class=kd>static</span> <span class=kd>class</span> <span class=nc>CalculateTeamScores</span>
</span></span><span class=line><span class=cl>    <span class=kd>extends</span> <span class=n>PTransform</span><span class=o>&lt;</span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>GameActionInfo</span><span class=o>&gt;,</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;&gt;</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>  <span class=kd>private</span> <span class=kd>final</span> <span class=n>Duration</span> <span class=n>teamWindowDuration</span><span class=o>;</span>
</span></span><span class=line><span class=cl>  <span class=kd>private</span> <span class=kd>final</span> <span class=n>Duration</span> <span class=n>allowedLateness</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=n>CalculateTeamScores</span><span class=o>(</span><span class=n>Duration</span> <span class=n>teamWindowDuration</span><span class=o>,</span> <span class=n>Duration</span> <span class=n>allowedLateness</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>    <span class=k>this</span><span class=o>.</span><span class=na>teamWindowDuration</span> <span class=o>=</span> <span class=n>teamWindowDuration</span><span class=o>;</span>
</span></span><span class=line><span class=cl>    <span class=k>this</span><span class=o>.</span><span class=na>allowedLateness</span> <span class=o>=</span> <span class=n>allowedLateness</span><span class=o>;</span>
</span></span><span class=line><span class=cl>  <span class=o>}</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=nd>@Override</span>
</span></span><span class=line><span class=cl>  <span class=kd>public</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span> <span class=nf>expand</span><span class=o>(</span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>GameActionInfo</span><span class=o>&gt;</span> <span class=n>infos</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>    <span class=k>return</span> <span class=n>infos</span>
</span></span><span class=line><span class=cl>        <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>            <span class=s>&#34;LeaderboardTeamFixedWindows&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>            <span class=n>Window</span><span class=o>.&lt;</span><span class=n>GameActionInfo</span><span class=o>&gt;</span><span class=n>into</span><span class=o>(</span><span class=n>FixedWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>teamWindowDuration</span><span class=o>))</span>
</span></span><span class=line><span class=cl>                <span class=c1>// We will get early (speculative) results as well as cumulative
</span></span></span><span class=line><span class=cl><span class=c1></span>                <span class=c1>// processing of late data.
</span></span></span><span class=line><span class=cl><span class=c1></span>                <span class=o>.</span><span class=na>triggering</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                    <span class=n>AfterWatermark</span><span class=o>.</span><span class=na>pastEndOfWindow</span><span class=o>()</span>
</span></span><span class=line><span class=cl>                        <span class=o>.</span><span class=na>withEarlyFirings</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                            <span class=n>AfterProcessingTime</span><span class=o>.</span><span class=na>pastFirstElementInPane</span><span class=o>()</span>
</span></span><span class=line><span class=cl>                                <span class=o>.</span><span class=na>plusDelayOf</span><span class=o>(</span><span class=n>FIVE_MINUTES</span><span class=o>))</span>
</span></span><span class=line><span class=cl>                        <span class=o>.</span><span class=na>withLateFirings</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                            <span class=n>AfterProcessingTime</span><span class=o>.</span><span class=na>pastFirstElementInPane</span><span class=o>()</span>
</span></span><span class=line><span class=cl>                                <span class=o>.</span><span class=na>plusDelayOf</span><span class=o>(</span><span class=n>TEN_MINUTES</span><span class=o>)))</span>
</span></span><span class=line><span class=cl>                <span class=o>.</span><span class=na>withAllowedLateness</span><span class=o>(</span><span class=n>allowedLateness</span><span class=o>)</span>
</span></span><span class=line><span class=cl>                <span class=o>.</span><span class=na>accumulatingFiredPanes</span><span class=o>())</span>
</span></span><span class=line><span class=cl>        <span class=c1>// Extract and sum teamname/score pairs from the event data.
</span></span></span><span class=line><span class=cl><span class=c1></span>        <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;ExtractTeamScore&#34;</span><span class=o>,</span> <span class=k>new</span> <span class=n>ExtractAndSumScore</span><span class=o>(</span><span class=s>&#34;team&#34;</span><span class=o>));</span>
</span></span><span class=line><span class=cl>  <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=k>class</span> <span class=nc>CalculateTeamScores</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=p>):</span>
</span></span><span class=line><span class=cl>  <span class=s2>&#34;&#34;&#34;Calculates scores for each team within the configured window duration.
</span></span></span><span class=line><span class=cl><span class=s2>
</span></span></span><span class=line><span class=cl><span class=s2>  Extract team/score pairs from the event stream, using hour-long windows by
</span></span></span><span class=line><span class=cl><span class=s2>  default.
</span></span></span><span class=line><span class=cl><span class=s2>  &#34;&#34;&#34;</span>
</span></span><span class=line><span class=cl>  <span class=k>def</span> <span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>team_window_duration</span><span class=p>,</span> <span class=n>allowed_lateness</span><span class=p>):</span>
</span></span><span class=line><span class=cl>    <span class=c1># TODO(BEAM-6158): Revert the workaround once we can pickle super() on py3.</span>
</span></span><span class=line><span class=cl>    <span class=c1># super().__init__()</span>
</span></span><span class=line><span class=cl>    <span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=o>.</span><span class=fm>__init__</span><span class=p>(</span><span class=bp>self</span><span class=p>)</span>
</span></span><span class=line><span class=cl>    <span class=bp>self</span><span class=o>.</span><span class=n>team_window_duration</span> <span class=o>=</span> <span class=n>team_window_duration</span> <span class=o>*</span> <span class=mi>60</span>
</span></span><span class=line><span class=cl>    <span class=bp>self</span><span class=o>.</span><span class=n>allowed_lateness_seconds</span> <span class=o>=</span> <span class=n>allowed_lateness</span> <span class=o>*</span> <span class=mi>60</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=k>def</span> <span class=nf>expand</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>pcoll</span><span class=p>):</span>
</span></span><span class=line><span class=cl>    <span class=c1># NOTE: the behavior does not exactly match the Java example</span>
</span></span><span class=line><span class=cl>    <span class=c1># TODO: allowed_lateness not implemented yet in FixedWindows</span>
</span></span><span class=line><span class=cl>    <span class=c1># TODO: AfterProcessingTime not implemented yet, replace AfterCount</span>
</span></span><span class=line><span class=cl>    <span class=k>return</span> <span class=p>(</span>
</span></span><span class=line><span class=cl>        <span class=n>pcoll</span>
</span></span><span class=line><span class=cl>        <span class=c1># We will get early (speculative) results as well as cumulative</span>
</span></span><span class=line><span class=cl>        <span class=c1># processing of late data.</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;LeaderboardTeamFixedWindows&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span>
</span></span><span class=line><span class=cl>            <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>FixedWindows</span><span class=p>(</span><span class=bp>self</span><span class=o>.</span><span class=n>team_window_duration</span><span class=p>),</span>
</span></span><span class=line><span class=cl>            <span class=n>trigger</span><span class=o>=</span><span class=n>trigger</span><span class=o>.</span><span class=n>AfterWatermark</span><span class=p>(</span>
</span></span><span class=line><span class=cl>                <span class=n>trigger</span><span class=o>.</span><span class=n>AfterCount</span><span class=p>(</span><span class=mi>10</span><span class=p>),</span> <span class=n>trigger</span><span class=o>.</span><span class=n>AfterCount</span><span class=p>(</span><span class=mi>20</span><span class=p>)),</span>
</span></span><span class=line><span class=cl>            <span class=n>accumulation_mode</span><span class=o>=</span><span class=n>trigger</span><span class=o>.</span><span class=n>AccumulationMode</span><span class=o>.</span><span class=n>ACCUMULATING</span><span class=p>)</span>
</span></span><span class=line><span class=cl>        <span class=c1># Extract and sum teamname/score pairs from the event data.</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;ExtractAndSumScore&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>ExtractAndSumScore</span><span class=p>(</span><span class=s1>&#39;team&#39;</span><span class=p>))</span></span></span></code></pre></div></div></div><p>Taken together, these processing strategies let us address the latency and completeness issues present in the <code>UserScore</code> and <code>HourlyTeamScore</code> pipelines, while still using the same basic transforms to process the data—as a matter of fact, both calculations still use the same <code>ExtractAndSumScore</code> transform that we used in both the <code>UserScore</code> and <code>HourlyTeamScore</code> pipelines.</p><h2 id=gamestats-abuse-detection-and-usage-analysis>GameStats: Abuse Detection and Usage Analysis</h2><p>While <code>LeaderBoard</code> demonstrates how to use basic windowing and triggers to perform low-latency and flexible data analysis, we can use more advanced windowing techniques to perform more comprehensive analysis. This might include some calculations designed to detect system abuse (like spam) or to gain insight into user behavior. The <code>GameStats</code> pipeline builds on the low-latency functionality in <code>LeaderBoard</code> to demonstrate how you can use Beam to perform this kind of advanced analysis.</p><p>Like <code>LeaderBoard</code>, <code>GameStats</code> reads data from an unbounded source. It is best thought of as an ongoing job that provides insight into the game as users play.</p><p class=language-java><blockquote><p><strong>Note:</strong> See <a href=https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/complete/game/GameStats.java>GameStats on GitHub</a> for the complete example pipeline program.</p></blockquote></p><p class=language-py><blockquote><p><strong>Note:</strong> See <a href=https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/complete/game/game_stats.py>GameStats on GitHub</a> for the complete example pipeline program.</p></blockquote></p><h3 id=what-does-gamestats-do>What Does GameStats Do?</h3><p>Like <code>LeaderBoard</code>, <code>GameStats</code> calculates the total score per team, per hour. However, the pipeline also performs two kinds of more complex analysis:</p><ul><li><code>GameStats</code> does <strong>abuse detection</strong> system that performs some simple statistical analysis on the score data to determine which users, if any, might be spammers or bots. It then uses the list of suspected spam/bot users to filter the bots out of the hourly team score calculation.</li><li><code>GameStats</code> <strong>analyzes usage patterns</strong> by grouping together game data that share similar event times using session windowing. This lets us gain some intelligence on how long users tend to play, and how game length changes over time.</li></ul><p>Below, we&rsquo;ll look at these features in more detail.</p><h4 id=abuse-detection>Abuse Detection</h4><p>Let&rsquo;s suppose scoring in our game depends on the speed at which a user can &ldquo;click&rdquo; on their phone. <code>GameStats</code>&rsquo;s abuse detection analyzes each user&rsquo;s score data to detect if a user has an abnormally high &ldquo;click rate&rdquo; and thus an abnormally high score. This might indicate that the game is being played by a bot that operates significantly faster than a human could play.</p><p>To determine whether or not a score is &ldquo;abnormally&rdquo; high, <code>GameStats</code> calculates the average of every score in that fixed-time window, and then checks each individual score against the average score multiplied by an arbitrary weight factor (in our case, 2.5). Thus, any score more than 2.5 times the average is deemed to be the product of spam. The <code>GameStats</code> pipeline tracks a list of &ldquo;spam&rdquo; users and filters those users out of the team score calculations for the team leader board.</p><p>Since the average depends on the pipeline data, we need to calculate it, and then use that calculated data in a subsequent <code>ParDo</code> transform that filters scores that exceed the weighted value. To do this, we can pass the calculated average to as a <a href=/documentation/programming-guide/#side-inputs>side input</a> to the filtering <code>ParDo</code>.</p><p>The following code example shows the composite transform that handles abuse detection. The transform uses the <code>Sum.integersPerKey</code> transform to sum all scores per user, and then the <code>Mean.globally</code> transform to determine the average score for all users. Once that&rsquo;s been calculated (as a <code>PCollectionView</code> singleton), we can pass it to the filtering <code>ParDo</code> using <code>.withSideInputs</code>:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=kd>public</span> <span class=kd>static</span> <span class=kd>class</span> <span class=nc>CalculateSpammyUsers</span>
</span></span><span class=line><span class=cl>    <span class=kd>extends</span> <span class=n>PTransform</span><span class=o>&lt;</span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;,</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;&gt;</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>  <span class=kd>private</span> <span class=kd>static</span> <span class=kd>final</span> <span class=n>Logger</span> <span class=n>LOG</span> <span class=o>=</span> <span class=n>LoggerFactory</span><span class=o>.</span><span class=na>getLogger</span><span class=o>(</span><span class=n>CalculateSpammyUsers</span><span class=o>.</span><span class=na>class</span><span class=o>);</span>
</span></span><span class=line><span class=cl>  <span class=kd>private</span> <span class=kd>static</span> <span class=kd>final</span> <span class=kt>double</span> <span class=n>SCORE_WEIGHT</span> <span class=o>=</span> <span class=n>2</span><span class=o>.</span><span class=na>5</span><span class=o>;</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=nd>@Override</span>
</span></span><span class=line><span class=cl>  <span class=kd>public</span> <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span> <span class=nf>expand</span><span class=o>(</span><span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span> <span class=n>userScores</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=c1>// Get the sum of scores for each user.
</span></span></span><span class=line><span class=cl><span class=c1></span>    <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span> <span class=n>sumScores</span> <span class=o>=</span>
</span></span><span class=line><span class=cl>        <span class=n>userScores</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;UserSum&#34;</span><span class=o>,</span> <span class=n>Sum</span><span class=o>.</span><span class=na>integersPerKey</span><span class=o>());</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=c1>// Extract the score from each element, and use it to find the global mean.
</span></span></span><span class=line><span class=cl><span class=c1></span>    <span class=kd>final</span> <span class=n>PCollectionView</span><span class=o>&lt;</span><span class=n>Double</span><span class=o>&gt;</span> <span class=n>globalMeanScore</span> <span class=o>=</span>
</span></span><span class=line><span class=cl>        <span class=n>sumScores</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Values</span><span class=o>.</span><span class=na>create</span><span class=o>()).</span><span class=na>apply</span><span class=o>(</span><span class=n>Mean</span><span class=o>.&lt;</span><span class=n>Integer</span><span class=o>&gt;</span><span class=n>globally</span><span class=o>().</span><span class=na>asSingletonView</span><span class=o>());</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=c1>// Filter the user sums using the global mean.
</span></span></span><span class=line><span class=cl><span class=c1></span>    <span class=n>PCollection</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span> <span class=n>filtered</span> <span class=o>=</span>
</span></span><span class=line><span class=cl>        <span class=n>sumScores</span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>            <span class=s>&#34;ProcessAndFilter&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>            <span class=n>ParDo</span>
</span></span><span class=line><span class=cl>                <span class=c1>// use the derived mean total score as a side input
</span></span></span><span class=line><span class=cl><span class=c1></span>                <span class=o>.</span><span class=na>of</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                    <span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;,</span> <span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;()</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>                      <span class=kd>private</span> <span class=kd>final</span> <span class=n>Counter</span> <span class=n>numSpammerUsers</span> <span class=o>=</span>
</span></span><span class=line><span class=cl>                          <span class=n>Metrics</span><span class=o>.</span><span class=na>counter</span><span class=o>(</span><span class=s>&#34;main&#34;</span><span class=o>,</span> <span class=s>&#34;SpammerUsers&#34;</span><span class=o>);</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>                      <span class=nd>@ProcessElement</span>
</span></span><span class=line><span class=cl>                      <span class=kd>public</span> <span class=kt>void</span> <span class=nf>processElement</span><span class=o>(</span><span class=n>ProcessContext</span> <span class=n>c</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>                        <span class=n>Integer</span> <span class=n>score</span> <span class=o>=</span> <span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>().</span><span class=na>getValue</span><span class=o>();</span>
</span></span><span class=line><span class=cl>                        <span class=n>Double</span> <span class=n>gmc</span> <span class=o>=</span> <span class=n>c</span><span class=o>.</span><span class=na>sideInput</span><span class=o>(</span><span class=n>globalMeanScore</span><span class=o>);</span>
</span></span><span class=line><span class=cl>                        <span class=k>if</span> <span class=o>(</span><span class=n>score</span> <span class=o>&gt;</span> <span class=o>(</span><span class=n>gmc</span> <span class=o>*</span> <span class=n>SCORE_WEIGHT</span><span class=o>))</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>                          <span class=n>LOG</span><span class=o>.</span><span class=na>info</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                              <span class=s>&#34;user &#34;</span>
</span></span><span class=line><span class=cl>                                  <span class=o>+</span> <span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>().</span><span class=na>getKey</span><span class=o>()</span>
</span></span><span class=line><span class=cl>                                  <span class=o>+</span> <span class=s>&#34; spammer score &#34;</span>
</span></span><span class=line><span class=cl>                                  <span class=o>+</span> <span class=n>score</span>
</span></span><span class=line><span class=cl>                                  <span class=o>+</span> <span class=s>&#34; with mean &#34;</span>
</span></span><span class=line><span class=cl>                                  <span class=o>+</span> <span class=n>gmc</span><span class=o>);</span>
</span></span><span class=line><span class=cl>                          <span class=n>numSpammerUsers</span><span class=o>.</span><span class=na>inc</span><span class=o>();</span>
</span></span><span class=line><span class=cl>                          <span class=n>c</span><span class=o>.</span><span class=na>output</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>());</span>
</span></span><span class=line><span class=cl>                        <span class=o>}</span>
</span></span><span class=line><span class=cl>                      <span class=o>}</span>
</span></span><span class=line><span class=cl>                    <span class=o>})</span>
</span></span><span class=line><span class=cl>                <span class=o>.</span><span class=na>withSideInputs</span><span class=o>(</span><span class=n>globalMeanScore</span><span class=o>));</span>
</span></span><span class=line><span class=cl>    <span class=k>return</span> <span class=n>filtered</span><span class=o>;</span>
</span></span><span class=line><span class=cl>  <span class=o>}</span>
</span></span><span class=line><span class=cl><span class=o>}</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=k>class</span> <span class=nc>CalculateSpammyUsers</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>PTransform</span><span class=p>):</span>
</span></span><span class=line><span class=cl>  <span class=s2>&#34;&#34;&#34;Filter out all but those users with a high clickrate, which we will
</span></span></span><span class=line><span class=cl><span class=s2>  consider as &#39;spammy&#39; users.
</span></span></span><span class=line><span class=cl><span class=s2>
</span></span></span><span class=line><span class=cl><span class=s2>  We do this by finding the mean total score per user, then using that
</span></span></span><span class=line><span class=cl><span class=s2>  information as a side input to filter out all but those user scores that are
</span></span></span><span class=line><span class=cl><span class=s2>  larger than (mean * SCORE_WEIGHT).
</span></span></span><span class=line><span class=cl><span class=s2>  &#34;&#34;&#34;</span>
</span></span><span class=line><span class=cl>  <span class=n>SCORE_WEIGHT</span> <span class=o>=</span> <span class=mf>2.5</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>  <span class=k>def</span> <span class=nf>expand</span><span class=p>(</span><span class=bp>self</span><span class=p>,</span> <span class=n>user_scores</span><span class=p>):</span>
</span></span><span class=line><span class=cl>    <span class=c1># Get the sum of scores for each user.</span>
</span></span><span class=line><span class=cl>    <span class=n>sum_scores</span> <span class=o>=</span> <span class=p>(</span><span class=n>user_scores</span> <span class=o>|</span> <span class=s1>&#39;SumUsersScores&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>CombinePerKey</span><span class=p>(</span><span class=nb>sum</span><span class=p>))</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=c1># Extract the score from each element, and use it to find the global mean.</span>
</span></span><span class=line><span class=cl>    <span class=n>global_mean_score</span> <span class=o>=</span> <span class=p>(</span>
</span></span><span class=line><span class=cl>        <span class=n>sum_scores</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>Values</span><span class=p>()</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>CombineGlobally</span><span class=p>(</span><span class=n>beam</span><span class=o>.</span><span class=n>combiners</span><span class=o>.</span><span class=n>MeanCombineFn</span><span class=p>())</span>\
</span></span><span class=line><span class=cl>            <span class=o>.</span><span class=n>as_singleton_view</span><span class=p>())</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=c1># Filter the user sums using the global mean.</span>
</span></span><span class=line><span class=cl>    <span class=n>filtered</span> <span class=o>=</span> <span class=p>(</span>
</span></span><span class=line><span class=cl>        <span class=n>sum_scores</span>
</span></span><span class=line><span class=cl>        <span class=c1># Use the derived mean total score (global_mean_score) as a side input.</span>
</span></span><span class=line><span class=cl>        <span class=o>|</span> <span class=s1>&#39;ProcessAndFilter&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>Filter</span><span class=p>(</span>
</span></span><span class=line><span class=cl>            <span class=k>lambda</span> <span class=n>key_score</span><span class=p>,</span> <span class=n>global_mean</span><span class=p>:</span>\
</span></span><span class=line><span class=cl>                <span class=n>key_score</span><span class=p>[</span><span class=mi>1</span><span class=p>]</span> <span class=o>&gt;</span> <span class=n>global_mean</span> <span class=o>*</span> <span class=bp>self</span><span class=o>.</span><span class=n>SCORE_WEIGHT</span><span class=p>,</span>
</span></span><span class=line><span class=cl>            <span class=n>global_mean_score</span><span class=p>))</span>
</span></span><span class=line><span class=cl>    <span class=k>return</span> <span class=n>filtered</span></span></span></code></pre></div></div></div><p>The abuse-detection transform generates a view of users suspected to be spambots. Later in the pipeline, we use that view to filter out any such users when we calculate the team score per hour, again by using the side input mechanism. The following code example shows where we insert the spam filter, between windowing the scores into fixed windows and extracting the team scores:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// Calculate the total score per team over fixed windows,
</span></span></span><span class=line><span class=cl><span class=c1>// and emit cumulative updates for late data. Uses the side input derived above-- the set of
</span></span></span><span class=line><span class=cl><span class=c1>// suspected robots-- to filter out scores from those users from the sum.
</span></span></span><span class=line><span class=cl><span class=c1>// Write the results to BigQuery.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>rawEvents</span>
</span></span><span class=line><span class=cl>    <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>        <span class=s>&#34;WindowIntoFixedWindows&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>        <span class=n>Window</span><span class=o>.</span><span class=na>into</span><span class=o>(</span>
</span></span><span class=line><span class=cl>            <span class=n>FixedWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardMinutes</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getFixedWindowDuration</span><span class=o>()))))</span>
</span></span><span class=line><span class=cl>    <span class=c1>// Filter out the detected spammer users, using the side input derived above.
</span></span></span><span class=line><span class=cl><span class=c1></span>    <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>        <span class=s>&#34;FilterOutSpammers&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>        <span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                <span class=k>new</span> <span class=n>DoFn</span><span class=o>&lt;</span><span class=n>GameActionInfo</span><span class=o>,</span> <span class=n>GameActionInfo</span><span class=o>&gt;()</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>                  <span class=nd>@ProcessElement</span>
</span></span><span class=line><span class=cl>                  <span class=kd>public</span> <span class=kt>void</span> <span class=nf>processElement</span><span class=o>(</span><span class=n>ProcessContext</span> <span class=n>c</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>                    <span class=c1>// If the user is not in the spammers Map, output the data element.
</span></span></span><span class=line><span class=cl><span class=c1></span>                    <span class=k>if</span> <span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>sideInput</span><span class=o>(</span><span class=n>spammersView</span><span class=o>).</span><span class=na>get</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>().</span><span class=na>getUser</span><span class=o>().</span><span class=na>trim</span><span class=o>())</span> <span class=o>==</span> <span class=kc>null</span><span class=o>)</span> <span class=o>{</span>
</span></span><span class=line><span class=cl>                      <span class=n>c</span><span class=o>.</span><span class=na>output</span><span class=o>(</span><span class=n>c</span><span class=o>.</span><span class=na>element</span><span class=o>());</span>
</span></span><span class=line><span class=cl>                    <span class=o>}</span>
</span></span><span class=line><span class=cl>                  <span class=o>}</span>
</span></span><span class=line><span class=cl>                <span class=o>})</span>
</span></span><span class=line><span class=cl>            <span class=o>.</span><span class=na>withSideInputs</span><span class=o>(</span><span class=n>spammersView</span><span class=o>))</span>
</span></span><span class=line><span class=cl>    <span class=c1>// Extract and sum teamname/score pairs from the event data.
</span></span></span><span class=line><span class=cl><span class=c1></span>    <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;ExtractTeamScore&#34;</span><span class=o>,</span> <span class=k>new</span> <span class=n>ExtractAndSumScore</span><span class=o>(</span><span class=s>&#34;team&#34;</span><span class=o>))</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># Calculate the total score per team over fixed windows, and emit cumulative</span>
</span></span><span class=line><span class=cl><span class=c1># updates for late data. Uses the side input derived above --the set of</span>
</span></span><span class=line><span class=cl><span class=c1># suspected robots-- to filter out scores from those users from the sum.</span>
</span></span><span class=line><span class=cl><span class=c1># Write the results to BigQuery.</span>
</span></span><span class=line><span class=cl><span class=p>(</span>  <span class=c1># pylint: disable=expression-not-assigned</span>
</span></span><span class=line><span class=cl>    <span class=n>raw_events</span>
</span></span><span class=line><span class=cl>    <span class=o>|</span> <span class=s1>&#39;WindowIntoFixedWindows&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span>
</span></span><span class=line><span class=cl>        <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>FixedWindows</span><span class=p>(</span><span class=n>fixed_window_duration</span><span class=p>))</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=c1># Filter out the detected spammer users, using the side input derived</span>
</span></span><span class=line><span class=cl>    <span class=c1># above</span>
</span></span><span class=line><span class=cl>    <span class=o>|</span> <span class=s1>&#39;FilterOutSpammers&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>Filter</span><span class=p>(</span>
</span></span><span class=line><span class=cl>        <span class=k>lambda</span> <span class=n>elem</span><span class=p>,</span> <span class=n>spammers</span><span class=p>:</span> <span class=n>elem</span><span class=p>[</span><span class=s1>&#39;user&#39;</span><span class=p>]</span> <span class=ow>not</span> <span class=ow>in</span> <span class=n>spammers</span><span class=p>,</span> <span class=n>spammers_view</span><span class=p>)</span>
</span></span><span class=line><span class=cl>    <span class=c1># Extract and sum teamname/score pairs from the event data.</span>
</span></span><span class=line><span class=cl>    <span class=o>|</span> <span class=s1>&#39;ExtractAndSumScore&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>ExtractAndSumScore</span><span class=p>(</span><span class=s1>&#39;team&#39;</span><span class=p>)</span></span></span></code></pre></div></div></div><h4 id=analyzing-usage-patterns>Analyzing Usage Patterns</h4><p>We can gain some insight on when users are playing our game, and for how long, by examining the event times for each game score and grouping scores with similar event times into <em>sessions</em>. <code>GameStats</code> uses Beam&rsquo;s built-in <a href=https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java>session windowing</a> function to group user scores into sessions based on the time they occurred.</p><p>When you set session windowing, you specify a <em>minimum gap duration</em> between events. All events whose arrival times are closer together than the minimum gap duration are grouped into the same window. Events where the difference in arrival time is greater than the gap are grouped into separate windows. Depending on how we set our minimum gap duration, we can safely assume that scores in the same session window are part of the same (relatively) uninterrupted stretch of play. Scores in a different window indicate that the user stopped playing the game for at least the minimum gap time before returning to it later.</p><p>The following diagram shows how data might look when grouped into session windows. Unlike fixed windows, session windows are <em>different for each user</em> and is dependent on each individual user&rsquo;s play pattern:</p><p><img src=/images/gaming-example-session-windows.png alt="User sessions with a minimum gap duration."></p><p><em>Figure 6: User sessions with a minimum gap duration. Each user has different
sessions, according to how many instances they play and how long their breaks
between instances are.</em></p><p>We can use the session-windowed data to determine the average length of uninterrupted play time for all of our users, as well as the total score they achieve during each session. We can do this in the code by first applying session windows, summing the score per user and session, and then using a transform to calculate the length of each individual session:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// Detect user sessions-- that is, a burst of activity separated by a gap from further
</span></span></span><span class=line><span class=cl><span class=c1>// activity. Find and record the mean session lengths.
</span></span></span><span class=line><span class=cl><span class=c1>// This information could help the game designers track the changing user engagement
</span></span></span><span class=line><span class=cl><span class=c1>// as their set of games changes.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=n>userEvents</span>
</span></span><span class=line><span class=cl>    <span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>        <span class=s>&#34;WindowIntoSessions&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>        <span class=n>Window</span><span class=o>.&lt;</span><span class=n>KV</span><span class=o>&lt;</span><span class=n>String</span><span class=o>,</span> <span class=n>Integer</span><span class=o>&gt;&gt;</span><span class=n>into</span><span class=o>(</span>
</span></span><span class=line><span class=cl>                <span class=n>Sessions</span><span class=o>.</span><span class=na>withGapDuration</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardMinutes</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getSessionGap</span><span class=o>())))</span>
</span></span><span class=line><span class=cl>            <span class=o>.</span><span class=na>withTimestampCombiner</span><span class=o>(</span><span class=n>TimestampCombiner</span><span class=o>.</span><span class=na>END_OF_WINDOW</span><span class=o>))</span>
</span></span><span class=line><span class=cl>    <span class=c1>// For this use, we care only about the existence of the session, not any particular
</span></span></span><span class=line><span class=cl><span class=c1></span>    <span class=c1>// information aggregated over it, so the following is an efficient way to do that.
</span></span></span><span class=line><span class=cl><span class=c1></span>    <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Combine</span><span class=o>.</span><span class=na>perKey</span><span class=o>(</span><span class=n>x</span> <span class=o>-&gt;</span> <span class=n>0</span><span class=o>))</span>
</span></span><span class=line><span class=cl>    <span class=c1>// Get the duration per session.
</span></span></span><span class=line><span class=cl><span class=c1></span>    <span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=s>&#34;UserSessionActivity&#34;</span><span class=o>,</span> <span class=n>ParDo</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=k>new</span> <span class=n>UserSessionInfoFn</span><span class=o>()))</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># Detect user sessions-- that is, a burst of activity separated by a gap</span>
</span></span><span class=line><span class=cl><span class=c1># from further activity. Find and record the mean session lengths.</span>
</span></span><span class=line><span class=cl><span class=c1># This information could help the game designers track the changing user</span>
</span></span><span class=line><span class=cl><span class=c1># engagement as their set of game changes.</span>
</span></span><span class=line><span class=cl><span class=p>(</span>  <span class=c1># pylint: disable=expression-not-assigned</span>
</span></span><span class=line><span class=cl>    <span class=n>user_events</span>
</span></span><span class=line><span class=cl>    <span class=o>|</span> <span class=s1>&#39;WindowIntoSessions&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span>
</span></span><span class=line><span class=cl>        <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>Sessions</span><span class=p>(</span><span class=n>session_gap</span><span class=p>),</span>
</span></span><span class=line><span class=cl>        <span class=n>timestamp_combiner</span><span class=o>=</span><span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>TimestampCombiner</span><span class=o>.</span><span class=n>OUTPUT_AT_EOW</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=c1># For this use, we care only about the existence of the session, not any</span>
</span></span><span class=line><span class=cl>    <span class=c1># particular information aggregated over it, so we can just group by key</span>
</span></span><span class=line><span class=cl>    <span class=c1># and assign a &#34;dummy value&#34; of None.</span>
</span></span><span class=line><span class=cl>    <span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>CombinePerKey</span><span class=p>(</span><span class=k>lambda</span> <span class=n>_</span><span class=p>:</span> <span class=kc>None</span><span class=p>)</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl>    <span class=c1># Get the duration of the session</span>
</span></span><span class=line><span class=cl>    <span class=o>|</span> <span class=s1>&#39;UserSessionActivity&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>ParDo</span><span class=p>(</span><span class=n>UserSessionActivity</span><span class=p>())</span></span></span></code></pre></div></div></div><p>This gives us a set of user sessions, each with an attached duration. We can then calculate the <em>average</em> session length by re-windowing the data into fixed time windows, and then calculating the average for all sessions that end in each hour:</p><div class='language-java snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-java data-lang=java><span class=line><span class=cl><span class=c1>// Re-window to process groups of session sums according to when the sessions complete.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>    <span class=s>&#34;WindowToExtractSessionMean&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>    <span class=n>Window</span><span class=o>.</span><span class=na>into</span><span class=o>(</span>
</span></span><span class=line><span class=cl>        <span class=n>FixedWindows</span><span class=o>.</span><span class=na>of</span><span class=o>(</span><span class=n>Duration</span><span class=o>.</span><span class=na>standardMinutes</span><span class=o>(</span><span class=n>options</span><span class=o>.</span><span class=na>getUserActivityWindowDuration</span><span class=o>()))))</span>
</span></span><span class=line><span class=cl><span class=c1>// Find the mean session duration in each window.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=o>.</span><span class=na>apply</span><span class=o>(</span><span class=n>Mean</span><span class=o>.&lt;</span><span class=n>Integer</span><span class=o>&gt;</span><span class=n>globally</span><span class=o>().</span><span class=na>withoutDefaults</span><span class=o>())</span>
</span></span><span class=line><span class=cl><span class=c1>// Write this info to a BigQuery table.
</span></span></span><span class=line><span class=cl><span class=c1></span><span class=o>.</span><span class=na>apply</span><span class=o>(</span>
</span></span><span class=line><span class=cl>    <span class=s>&#34;WriteAvgSessionLength&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>    <span class=k>new</span> <span class=n>WriteWindowedToBigQuery</span><span class=o>&lt;&gt;(</span>
</span></span><span class=line><span class=cl>        <span class=n>options</span><span class=o>.</span><span class=na>as</span><span class=o>(</span><span class=n>GcpOptions</span><span class=o>.</span><span class=na>class</span><span class=o>).</span><span class=na>getProject</span><span class=o>(),</span>
</span></span><span class=line><span class=cl>        <span class=n>options</span><span class=o>.</span><span class=na>getDataset</span><span class=o>(),</span>
</span></span><span class=line><span class=cl>        <span class=n>options</span><span class=o>.</span><span class=na>getGameStatsTablePrefix</span><span class=o>()</span> <span class=o>+</span> <span class=s>&#34;_sessions&#34;</span><span class=o>,</span>
</span></span><span class=line><span class=cl>        <span class=n>configureSessionWindowWrite</span><span class=o>()));</span></span></span></code></pre></div></div></div><div class='language-py snippet'><div class="notebook-skip code-snippet"><a class=copy type=button data-bs-toggle=tooltip data-bs-placement=bottom title="Copy to clipboard"><img src=/images/copy-icon.svg></a><div class=highlight><pre tabindex=0 class=chroma><code class=language-py data-lang=py><span class=line><span class=cl><span class=c1># Re-window to process groups of session sums according to when the</span>
</span></span><span class=line><span class=cl><span class=c1># sessions complete</span>
</span></span><span class=line><span class=cl><span class=o>|</span> <span class=s1>&#39;WindowToExtractSessionMean&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>beam</span><span class=o>.</span><span class=n>WindowInto</span><span class=p>(</span>
</span></span><span class=line><span class=cl>    <span class=n>beam</span><span class=o>.</span><span class=n>window</span><span class=o>.</span><span class=n>FixedWindows</span><span class=p>(</span><span class=n>user_activity_window_duration</span><span class=p>))</span>
</span></span><span class=line><span class=cl>
</span></span><span class=line><span class=cl><span class=c1># Find the mean session duration in each window</span>
</span></span><span class=line><span class=cl><span class=o>|</span> <span class=n>beam</span><span class=o>.</span><span class=n>CombineGlobally</span><span class=p>(</span>
</span></span><span class=line><span class=cl>    <span class=n>beam</span><span class=o>.</span><span class=n>combiners</span><span class=o>.</span><span class=n>MeanCombineFn</span><span class=p>())</span><span class=o>.</span><span class=n>without_defaults</span><span class=p>()</span>
</span></span><span class=line><span class=cl><span class=o>|</span> <span class=s1>&#39;FormatAvgSessionLength&#39;</span> <span class=o>&gt;&gt;</span>
</span></span><span class=line><span class=cl><span class=n>beam</span><span class=o>.</span><span class=n>Map</span><span class=p>(</span><span class=k>lambda</span> <span class=n>elem</span><span class=p>:</span> <span class=p>{</span><span class=s1>&#39;mean_duration&#39;</span><span class=p>:</span> <span class=nb>float</span><span class=p>(</span><span class=n>elem</span><span class=p>)})</span>
</span></span><span class=line><span class=cl><span class=o>|</span> <span class=s1>&#39;WriteAvgSessionLength&#39;</span> <span class=o>&gt;&gt;</span> <span class=n>WriteToBigQuery</span><span class=p>(</span>
</span></span><span class=line><span class=cl>    <span class=n>args</span><span class=o>.</span><span class=n>table_name</span> <span class=o>+</span> <span class=s1>&#39;_sessions&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>    <span class=n>args</span><span class=o>.</span><span class=n>dataset</span><span class=p>,</span> <span class=p>{</span>
</span></span><span class=line><span class=cl>        <span class=s1>&#39;mean_duration&#39;</span><span class=p>:</span> <span class=s1>&#39;FLOAT&#39;</span><span class=p>,</span>
</span></span><span class=line><span class=cl>    <span class=p>},</span>
</span></span><span class=line><span class=cl>    <span class=n>options</span><span class=o>.</span><span class=n>view_as</span><span class=p>(</span><span class=n>GoogleCloudOptions</span><span class=p>)</span><span class=o>.</span><span class=n>project</span><span class=p>))</span></span></span></code></pre></div></div></div><p>We can use the resulting information to find, for example, what times of day our users are playing the longest, or which stretches of the day are more likely to see shorter play sessions.</p><h2 id=next-steps>Next Steps</h2><ul><li>Take a self-paced tour through our <a href=/documentation/resources/learning-resources>Learning Resources</a>.</li><li>Dive in to some of our favorite <a href=/get-started/resources/videos-and-podcasts>Videos and Podcasts</a>.</li><li>Join the Beam <a href=/community/contact-us>users@</a> mailing list.</li></ul><p>Please don&rsquo;t hesitate to <a href=/community/contact-us>reach out</a> if you encounter any issues!</p><div class=feedback><p class=update>Last updated on 2024/05/26</p><h3>Have you found everything you were looking for?</h3><p class=description>Was it all useful and clear? Is there anything that you would like to change? Let us know!</p><button class=load-button><a href="https://docs.google.com/forms/d/e/1FAIpQLSfID7abne3GE6k6RdJIyZhPz2Gef7UkpggUEhTIDjjplHuxSA/viewform?usp=header_link" target=_blank>SEND FEEDBACK</a></button></div></div></div><footer class=footer><div class=footer__contained><div class=footer__cols><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col__logo><img src=/images/beam_logo_circle.svg class=footer__logo alt="Beam logo"></div><div class=footer__cols__col__logo><img src=/images/apache_logo_circle.svg class=footer__logo alt="Apache logo"></div></div><div class=footer-wrapper><div class=wrapper-grid><div class=footer__cols__col><div class=footer__cols__col__title>Start</div><div class=footer__cols__col__link><a href=/get-started/beam-overview/>Overview</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-java/>Quickstart (Java)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-py/>Quickstart (Python)</a></div><div class=footer__cols__col__link><a href=/get-started/quickstart-go/>Quickstart (Go)</a></div><div class=footer__cols__col__link><a href=/get-started/downloads/>Downloads</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Docs</div><div class=footer__cols__col__link><a href=/documentation/programming-guide/>Concepts</a></div><div class=footer__cols__col__link><a href=/documentation/pipelines/design-your-pipeline/>Pipelines</a></div><div class=footer__cols__col__link><a href=/documentation/runners/capability-matrix/>Runners</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Community</div><div class=footer__cols__col__link><a href=/contribute/>Contribute</a></div><div class=footer__cols__col__link><a href=https://projects.apache.org/committee.html?beam target=_blank>Team<img src=/images/external-link-icon.png width=14 height=14 alt="External link."></a></div><div class=footer__cols__col__link><a href=/community/presentation-materials/>Media</a></div><div class=footer__cols__col__link><a href=/community/in-person/>Events/Meetups</a></div><div class=footer__cols__col__link><a href=/community/contact-us/>Contact Us</a></div></div><div class=footer__cols__col><div class=footer__cols__col__title>Resources</div><div class=footer__cols__col__link><a href=/blog/>Blog</a></div><div class=footer__cols__col__link><a href=https://github.com/apache/beam>GitHub</a></div></div></div><div class=footer__bottom>&copy;
<a href=https://www.apache.org>The Apache Software Foundation</a>
| <a href=/privacy_policy>Privacy Policy</a>
| <a href=/feed.xml>RSS Feed</a><br><br>Apache Beam, Apache, Beam, the Beam logo, and the Apache feather logo are either registered trademarks or trademarks of The Apache Software Foundation. All other products or name brands are trademarks of their respective holders, including The Apache Software Foundation.</div></div><div class="footer__cols__col footer__cols__col__logos"><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://github.com/apache/beam><img src=/images/logos/social-icons/github-logo-150.png class=footer__logo alt="Github logo"></a></div><div class=footer__cols__col__logo><a href=https://www.linkedin.com/company/apache-beam/><img src=/images/logos/social-icons/linkedin-logo-150.png class=footer__logo alt="Linkedin logo"></a></div></div><div class=footer__cols__col--group><div class=footer__cols__col__logo><a href=https://twitter.com/apachebeam><img src=/images/logos/social-icons/twitter-logo-150.png class=footer__logo alt="Twitter logo"></a></div><div class=footer__cols__col__logo><a href=https://www.youtube.com/channel/UChNnb_YO_7B0HlW6FhAXZZQ><img src=/images/logos/social-icons/youtube-logo-150.png class=footer__logo alt="Youtube logo"></a></div></div></div></div></div></footer></body></html>