blob: a2164aa6a019bf68ecb70ca5dd48f6f30cb4a04b [file] [log] [blame]
#!groovy
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
t = new TestScripts(args)
/*
* Run the mobile game examples on Dataflow.
* https://beam.apache.org/get-started/mobile-gaming-example/
*/
t.describe ('Run Apache Beam Java SDK Mobile Gaming Examples - Dataflow')
QuickstartArchetype.generate(t)
def runner = "DataflowRunner"
String command_output_text
/**
* Run the UserScore example on DataflowRunner
* */
mobileGamingCommands = new MobileGamingCommands(testScripts: t, testRunId: UUID.randomUUID().toString())
t.intent("Running: UserScore example on DataflowRunner")
t.run(mobileGamingCommands.createPipelineCommand("UserScore", runner))
int retries = 5
int waitTime = 15 // seconds
def outputPath = "gs://${t.gcsBucket()}/${mobileGamingCommands.getUserScoreOutputName(runner)}"
def outputFound = false
for (int i = 0; i < retries; i++) {
def files = t.run("gsutil ls ${outputPath}*")
if (files?.trim()) {
outputFound = true
break
}
t.intent("Output not found yet. Waiting ${waitTime}s...")
Thread.sleep(waitTime * 1000)
}
if (!outputFound) {
throw new RuntimeException("No output files found for HourlyTeamScore after ${retries * waitTime} seconds.")
}
command_output_text = t.run "gsutil cat ${outputPath}* | grep user19_BananaWallaby"
t.see "total_score: 231, user: user19_BananaWallaby", command_output_text
t.success("UserScore successfully run on DataflowRunner.")
t.run "gsutil rm gs://${t.gcsBucket()}/${mobileGamingCommands.getUserScoreOutputName(runner)}*"
/**
* Run the HourlyTeamScore example on DataflowRunner
* */
mobileGamingCommands = new MobileGamingCommands(testScripts: t, testRunId: UUID.randomUUID().toString())
t.intent("Running: HourlyTeamScore example on DataflowRunner")
t.run(mobileGamingCommands.createPipelineCommand("HourlyTeamScore", runner))
outputPath = "gs://${t.gcsBucket()}/${mobileGamingCommands.getHourlyTeamScoreOutputName(runner)}"
outputFound = false
for (int i = 0; i < retries; i++) {
def files = t.run("gsutil ls ${outputPath}*")
if (files?.trim()) {
outputFound = true
break
}
t.intent("Output not found yet. Waiting ${waitTime}s...")
Thread.sleep(waitTime * 1000)
}
if (!outputFound) {
throw new RuntimeException("No output files found for UserScore after ${retries * waitTime} seconds.")
}
command_output_text = t.run "gsutil cat ${outputPath}* | grep AzureBilby "
t.see "total_score: 2788, team: AzureBilby", command_output_text
t.success("HourlyTeamScore successfully run on DataflowRunner.")
t.run "gsutil rm gs://${t.gcsBucket()}/${mobileGamingCommands.getHourlyTeamScoreOutputName(runner)}*"
/**
* Run the LeaderBoard example on DataflowRunner with and without Streaming Engine
* */
class LeaderBoardRunner {
def run(runner, TestScripts t, MobileGamingCommands mobileGamingCommands, boolean useStreamingEngine) {
t.intent("Running: LeaderBoard example on DataflowRunner" +
(useStreamingEngine ? " with Streaming Engine" : ""))
def dataset = t.bqDataset()
def userTable = "leaderboard_DataflowRunner_user"
def teamTable = "leaderboard_DataflowRunner_team"
def userSchema = [
"user:STRING",
"total_score:INTEGER",
"processing_time:STRING"
].join(",")
def teamSchema = [
"team:STRING",
"total_score:INTEGER",
"window_start:STRING",
"processing_time:STRING",
"timing:STRING"
].join(",")
String tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
if (!tables.contains(userTable)) {
t.intent("Creating table: ${userTable}")
t.run("bq mk --table ${dataset}.${userTable} ${userSchema}")
}
if (!tables.contains(teamTable)) {
t.intent("Creating table: ${teamTable}")
t.run("bq mk --table ${dataset}.${teamTable} ${teamSchema}")
}
// Verify that the tables have been created successfully
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
while (!tables.contains(userTable) || !tables.contains(teamTable)) {
sleep(3000)
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
}
println "Tables ${userTable} and ${teamTable} created successfully."
def InjectorThread = Thread.start() {
t.run(mobileGamingCommands.createInjectorCommand())
}
String jobName = "leaderboard-validation-" + new Date().getTime() + "-" + new Random().nextInt(1000)
def LeaderBoardThread = Thread.start() {
if (useStreamingEngine) {
t.run(mobileGamingCommands.createPipelineCommand(
"LeaderBoardWithStreamingEngine", runner, jobName, "LeaderBoard"))
} else {
t.run(mobileGamingCommands.createPipelineCommand("LeaderBoard", runner, jobName))
}
}
t.run("gcloud dataflow jobs list | grep pyflow-wordstream-candidate | grep Running | cut -d' ' -f1")
// verify outputs in BQ tables
def startTime = System.currentTimeMillis()
def isSuccess = false
String query_result = ""
while ((System.currentTimeMillis() - startTime) / 60000 < mobileGamingCommands.EXECUTION_TIMEOUT_IN_MINUTES) {
try {
tables = t.run "bq query --use_legacy_sql=false SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES"
if (tables.contains(userTable) && tables.contains(teamTable)) {
query_result = t.run """bq query --batch "SELECT user FROM [${dataset}.${userTable}] LIMIT 10\""""
if (t.seeAnyOf(mobileGamingCommands.COLORS, query_result)) {
isSuccess = true
break
}
}
} catch (Exception e) {
println "Warning: Exception while checking tables: ${e.message}"
println "Retrying..."
}
println "Waiting for pipeline to produce more results..."
sleep(60000) // wait for 1 min
}
InjectorThread.stop()
LeaderBoardThread.stop()
t.run("""RUNNING_JOB=`gcloud dataflow jobs list | grep ${jobName} | grep Running | cut -d' ' -f1`
if [ ! -z "\${RUNNING_JOB}" ]
then
gcloud dataflow jobs cancel \${RUNNING_JOB}
else
echo "Job '${jobName}' is not running."
fi
""")
if (!isSuccess) {
t.error("FAILED: Failed running LeaderBoard on DataflowRunner" +
(useStreamingEngine ? " with Streaming Engine" : ""))
}
t.success("LeaderBoard successfully run on DataflowRunner." + (useStreamingEngine ? " with Streaming Engine" : ""))
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
if (tables.contains(userTable)) {
t.run("bq rm -f -t ${dataset}.${userTable}")
}
if (tables.contains(teamTable)) {
t.run("bq rm -f -t ${dataset}.${teamTable}")
}
// It will take couple seconds to clean up tables.
// This loop makes sure tables are completely deleted before running the pipeline
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
while (tables.contains(userTable) || tables.contains(teamTable)) {
sleep(3000)
tables = t.run("bq query --use_legacy_sql=false 'SELECT table_name FROM ${dataset}.INFORMATION_SCHEMA.TABLES'")
}
}
}
new LeaderBoardRunner().run(runner, t, mobileGamingCommands, false)
new LeaderBoardRunner().run(runner, t, mobileGamingCommands, true)
t.done()