| #!groovy | 
 | /* | 
 |  * Licensed to the Apache Software Foundation (ASF) under one | 
 |  * or more contributor license agreements.  See the NOTICE file | 
 |  * distributed with this work for additional information | 
 |  * regarding copyright ownership.  The ASF licenses this file | 
 |  * to you under the Apache License, Version 2.0 (the | 
 |  * "License"); you may not use this file except in compliance | 
 |  * with the License.  You may obtain a copy of the License at | 
 |  * | 
 |  *     http://www.apache.org/licenses/LICENSE-2.0 | 
 |  * | 
 |  * Unless required by applicable law or agreed to in writing, software | 
 |  * distributed under the License is distributed on an "AS IS" BASIS, | 
 |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 |  * See the License for the specific language governing permissions and | 
 |  * limitations under the License. | 
 |  */ | 
 |  | 
 | t = new TestScripts(args) | 
 |  | 
 | /* | 
 |  * Run the mobile game examples on Dataflow. | 
 |  * https://beam.apache.org/get-started/mobile-gaming-example/ | 
 |  */ | 
 |  | 
 | t.describe ('Run Apache Beam Java SDK Mobile Gaming Examples - Dataflow') | 
 |  | 
 | QuickstartArchetype.generate(t) | 
 |  | 
 | def runner = "DataflowRunner" | 
 | String command_output_text | 
 |  | 
 | /** | 
 |  *  Run the UserScore example on DataflowRunner | 
 |  * */ | 
 |  | 
 | mobileGamingCommands = new MobileGamingCommands(testScripts: t, testRunId: UUID.randomUUID().toString()) | 
 |  | 
 | t.intent("Running: UserScore example on DataflowRunner") | 
 | t.run(mobileGamingCommands.createPipelineCommand("UserScore", runner)) | 
 |  | 
 | int retries = 5 | 
 | int waitTime = 15 // seconds | 
 | def outputPath = "gs://${t.gcsBucket()}/${mobileGamingCommands.getUserScoreOutputName(runner)}" | 
 | def outputFound = false | 
 | for (int i = 0; i < retries; i++) { | 
 |   def files = t.run("gsutil ls ${outputPath}*") | 
 |   if (files?.trim()) { | 
 |     outputFound = true | 
 |     break | 
 |   } | 
 |   t.intent("Output not found yet. Waiting ${waitTime}s...") | 
 |   Thread.sleep(waitTime * 1000) | 
 | } | 
 |  | 
 | if (!outputFound) { | 
 |   throw new RuntimeException("No output files found for HourlyTeamScore after ${retries * waitTime} seconds.") | 
 | } | 
 |  | 
 | command_output_text = t.run "gsutil cat ${outputPath}* | grep user19_BananaWallaby" | 
 | t.see "total_score: 231, user: user19_BananaWallaby", command_output_text | 
 | t.success("UserScore successfully run on DataflowRunner.") | 
 | t.run "gsutil rm gs://${t.gcsBucket()}/${mobileGamingCommands.getUserScoreOutputName(runner)}*" | 
 |  | 
 |  | 
 | /** | 
 |  * Run the HourlyTeamScore example on DataflowRunner | 
 |  * */ | 
 |  | 
 | mobileGamingCommands = new MobileGamingCommands(testScripts: t, testRunId: UUID.randomUUID().toString()) | 
 |  | 
 | t.intent("Running: HourlyTeamScore example on DataflowRunner") | 
 | t.run(mobileGamingCommands.createPipelineCommand("HourlyTeamScore", runner)) | 
 |  | 
 | outputPath = "gs://${t.gcsBucket()}/${mobileGamingCommands.getHourlyTeamScoreOutputName(runner)}" | 
 | outputFound = false | 
 | for (int i = 0; i < retries; i++) { | 
 |   def files = t.run("gsutil ls ${outputPath}*") | 
 |   if (files?.trim()) { | 
 |     outputFound = true | 
 |     break | 
 |   } | 
 |   t.intent("Output not found yet. Waiting ${waitTime}s...") | 
 |   Thread.sleep(waitTime * 1000) | 
 | } | 
 |  | 
 | if (!outputFound) { | 
 |   throw new RuntimeException("No output files found for UserScore after ${retries * waitTime} seconds.") | 
 | } | 
 |  | 
 | command_output_text = t.run "gsutil cat ${outputPath}* | grep AzureBilby " | 
 | t.see "total_score: 2788, team: AzureBilby", command_output_text | 
 | t.success("HourlyTeamScore successfully run on DataflowRunner.") | 
 | t.run "gsutil rm gs://${t.gcsBucket()}/${mobileGamingCommands.getHourlyTeamScoreOutputName(runner)}*" | 
 |  | 
 |  | 
 | /** | 
 |  * Run the LeaderBoard example on DataflowRunner with and without Streaming Engine | 
 |  * */ | 
 | class LeaderBoardRunner { | 
 |   def run(runner, TestScripts t, MobileGamingCommands mobileGamingCommands, boolean useStreamingEngine) { | 
 |     t.intent("Running: LeaderBoard example on DataflowRunner" + | 
 |             (useStreamingEngine ? " with Streaming Engine" : "")) | 
 |  | 
 |     def dataset = t.bqDataset() | 
 |     def userTable = "leaderboard_DataflowRunner_user" | 
 |     def teamTable = "leaderboard_DataflowRunner_team" | 
 |     def userSchema = [ | 
 |             "user:STRING", | 
 |             "total_score:INTEGER", | 
 |             "processing_time:STRING" | 
 |     ].join(",") | 
 |     def teamSchema = [ | 
 |             "team:STRING", | 
 |             "total_score:INTEGER", | 
 |             "window_start:STRING", | 
 |             "processing_time:STRING", | 
 |             "timing:STRING" | 
 |     ].join(",") | 
 |  | 
 |     String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") | 
 |  | 
 |     if (!tables.contains(userTable)) { | 
 |       t.intent("Creating table: ${userTable}") | 
 |       t.run("bq mk --table ${dataset}.${userTable} ${userSchema}") | 
 |     } | 
 |     if (!tables.contains(teamTable)) { | 
 |       t.intent("Creating table: ${teamTable}") | 
 |       t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}") | 
 |     } | 
 |  | 
 |     // Verify that the tables have been created successfully | 
 |     tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") | 
 |     while (!tables.contains(userTable) || !tables.contains(teamTable)) { | 
 |       sleep(3000) | 
 |       tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") | 
 |     } | 
 |     println "Tables ${userTable} and ${teamTable} created successfully." | 
 |  | 
 |     def InjectorThread = Thread.start() { | 
 |       t.run(mobileGamingCommands.createInjectorCommand()) | 
 |     } | 
 |  | 
 |     String jobName = "leaderboard-validation-" + new Date().getTime() + "-" + new Random().nextInt(1000) | 
 |     def LeaderBoardThread = Thread.start() { | 
 |       if (useStreamingEngine) { | 
 |         t.run(mobileGamingCommands.createPipelineCommand( | 
 |                 "LeaderBoardWithStreamingEngine", runner, jobName, "LeaderBoard")) | 
 |       } else { | 
 |         t.run(mobileGamingCommands.createPipelineCommand("LeaderBoard", runner, jobName)) | 
 |       } | 
 |     } | 
 |  | 
 |     t.run("gcloud dataflow jobs list | grep pyflow-wordstream-candidate | grep Running | cut -d' ' -f1") | 
 |  | 
 |     // verify outputs in BQ tables | 
 |     def startTime = System.currentTimeMillis() | 
 |     def isSuccess = false | 
 |     String query_result = "" | 
 |     while ((System.currentTimeMillis() - startTime) / 60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) { | 
 |       try { | 
 |         tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES" | 
 |         if (tables.contains(userTable) && tables.contains(teamTable)) { | 
 |           query_result = t.run """bq query --batch "SELECT user FROM [${dataset}.${userTable}] LIMIT 10\"""" | 
 |           if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) { | 
 |             isSuccess = true | 
 |             break | 
 |           } | 
 |         } | 
 |       } catch (Exception e) { | 
 |         println "Warning: Exception while checking tables: ${e.message}" | 
 |         println "Retrying..." | 
 |       } | 
 |       println "Waiting for pipeline to produce more results..." | 
 |       sleep(60000) // wait for 1 min | 
 |     } | 
 |     InjectorThread.stop() | 
 |     LeaderBoardThread.stop() | 
 |     t.run("""RUNNING_JOB=`gcloud dataflow jobs list | grep ${jobName} | grep Running | cut -d' ' -f1` | 
 | if [ ! -z "\${RUNNING_JOB}" ]  | 
 |   then  | 
 |     gcloud dataflow jobs cancel \${RUNNING_JOB} | 
 |   else  | 
 |     echo "Job '${jobName}' is not running." | 
 | fi  | 
 | """) | 
 |  | 
 |     if (!isSuccess) { | 
 |       t.error("FAILED: Failed running LeaderBoard on DataflowRunner" + | 
 |               (useStreamingEngine ? " with Streaming Engine" : "")) | 
 |     } | 
 |     t.success("LeaderBoard successfully run on DataflowRunner." + (useStreamingEngine ? " with Streaming Engine" : "")) | 
 |  | 
 |     tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") | 
 |     if (tables.contains(userTable)) { | 
 |       t.run("bq rm -f -t ${dataset}.${userTable}") | 
 |     } | 
 |     if (tables.contains(teamTable)) { | 
 |       t.run("bq rm -f -t ${dataset}.${teamTable}") | 
 |     } | 
 |  | 
 |     // It will take couple seconds to clean up tables. | 
 |     // This loop makes sure tables are completely deleted before running the pipeline | 
 |     tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") | 
 |     while (tables.contains(userTable) || tables.contains(teamTable)) { | 
 |       sleep(3000) | 
 |       tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'") | 
 |     } | 
 |   } | 
 | } | 
 |  | 
 | new LeaderBoardRunner().run(runner, t, mobileGamingCommands, false) | 
 | new LeaderBoardRunner().run(runner, t, mobileGamingCommands, true) | 
 |  | 
 | t.done() |