blob: 0f2bfe4aaec296c1369aedaceda6e5b6b69161e1 [file] [log] [blame]
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This script automates release candidate validation process.
#
# It reads configurations from script.config, checks environment settings and
# runs a list of validation pipelines against multiple runners one after
# another.
#
# NOTE:
# 1. Please set all variables in script.config before running this script.
# 2. Please babysit this script until first pipeline starts.
cd $(dirname $0)
. script.config
function clean_up(){
echo ""
echo "====================Final Steps===================="
echo "-----------------Stopping Pubsub Java Injector-----------------"
echo "Please stop java injector manually."
echo "-----------------Stopping multi-language quickstart services-----------------"
echo "Please stop the Java expansion service manually."
echo "Please stop the Python expansion service manually."
echo "Please stop the Python portable runner Job server manually."
echo "-----------------Signing up Spreadsheet-----------------"
echo "Please open this spreadsheet: https://s.apache.org/beam-release-validation"
echo "Please sign up your name in the tests you have ran."
echo "-----------------Final Cleanup-----------------"
if [[ -f ~/.m2/$BACKUP_M2 ]]; then
rm ~/.m2/settings.xml
cp ~/.m2/$BACKUP_M2 ~/.m2/settings.xml
rm ~/.m2/$BACKUP_M2
echo "* Restored ~/.m2/settings.xml"
fi
if [[ -f ~/$BACKUP_BASHRC ]]; then
rm ~/.bashrc
cp ~/$BACKUP_BASHRC ~/.bashrc
rm ~/$BACKUP_BASHRC
echo "* Restored ~/.bashrc"
fi
rm -rf ${LOCAL_BEAM_DIR}
echo "* Deleted workspace ${LOCAL_BEAM_DIR}"
if [[ -n `which gcloud` ]]; then
if [[ -n ${KAFKA_CLUSTER_NAME} ]]; then
echo "-----------------------Clean up Kafka Cluster on GKE------------------------"
gcloud container clusters delete --project=${USER_GCP_PROJECT} --region=${USER_GCP_REGION} --async -q ${KAFKA_CLUSTER_NAME}
fi
if [[ -n ${SQL_TAXI_TOPIC} ]]; then
echo "-----------------------Clean up pubsub topic on GCP------------------------"
gcloud pubsub topics delete --project=${USER_GCP_PROJECT} ${SQL_TAXI_TOPIC}
fi
fi
}
trap clean_up EXIT
setup_bashrc=0
function set_bashrc(){
[[ $setup_bashrc -eq 0 ]] || return
# [BEAM-4518]
FIXED_WINDOW_DURATION=20
cp ~/.bashrc ~/$BACKUP_BASHRC
echo "export USER_GCP_PROJECT=${USER_GCP_PROJECT}" >> ~/.bashrc
echo "export USER_GCS_BUCKET=${USER_GCS_BUCKET}" >> ~/.bashrc
echo "export SHARED_PUBSUB_TOPIC=${SHARED_PUBSUB_TOPIC}" >> ~/.bashrc
echo "export GOOGLE_APPLICATION_CREDENTIALS=${GOOGLE_APPLICATION_CREDENTIALS}" >> ~/.bashrc
echo "export RELEASE_VER=${RELEASE_VER}" >> ~/.bashrc
echo "export FIXED_WINDOW_DURATION=${FIXED_WINDOW_DURATION}" >> ~/.bashrc
echo "export LOCAL_BEAM_DIR=${LOCAL_BEAM_DIR}" >> ~/.bashrc
setup_bashrc=1
}
RC_TAG="v${RELEASE_VER}-RC${RC_NUM}"
RELEASE_BRANCH="releasev${RELEASE_VER}"
WORKING_BRANCH=v${RELEASE_VER}-RC${RC_NUM}_validations
GIT_REPO_URL=https://github.com/apache/beam.git
PYTHON_RC_DOWNLOAD_URL=https://dist.apache.org/repos/dist/dev/beam
HUB_VERSION=2.12.0
HUB_ARTIFACTS_NAME=hub-linux-amd64-${HUB_VERSION}
BACKUP_BASHRC=.bashrc_backup_$(date +"%Y%m%d%H%M%S")
BACKUP_M2=settings_backup_$(date +"%Y%m%d%H%M%S").xml
declare -a PYTHON_VERSIONS_TO_VALIDATE=("python3.8")
echo ""
echo "====================Checking Environment & Variables================="
echo "PLEASE update RC_VALIDATE_CONFIGS in file script.config first."
echo ""
echo "running validations on release ${RELEASE_VER} RC${RC_NUM}."
echo "repo URL for this RC: ${REPO_URL}"
echo "using workspace: ${LOCAL_BEAM_DIR}"
echo "validate Python versions: "$(IFS=$' '; echo "${PYTHON_VERSIONS_TO_VALIDATE[*]}")
echo ""
echo "All environment and workflow configurations from RC_VALIDATE_CONFIGS:"
for i in "${RC_VALIDATE_CONFIGS[@]}"; do
echo "$i = ${!i}"
done
echo "TODO(https://github.com/apache/beam/issues/21237): parts of this script launch background processes with gnome-terminal,"
echo "It may not work well over ssh or within a tmux session. Using 'ssh -Y' may help."
echo "[Confirmation Required] Would you like to proceed with current settings? [y|N]"
read confirmation
if [[ $confirmation != "y" ]]; then
echo "Please rerun this script and make sure you have the right configurations."
exit
fi
echo "[Confirmation Required] Would you like to check published Java artifacts (if you've completed this step for this RC previously, you can safely skip this)? [y|N]"
read confirmation
if [[ $confirmation == "y" ]]; then
echo "----------------- Checking published Java artifacts (should take ~1 minute) -----------------"
java_bom=$(curl "${REPO_URL}/org/apache/beam/beam-sdks-java-bom/${RELEASE_VER}/beam-sdks-java-bom-${RELEASE_VER}.pom")
artifacts=( $(echo $java_bom | grep -Eo "<artifactId>\S+?</artifactId>" | grep -Eo "beam-[a-zA-Z0-9.-]+") )
if [ ${#artifacts[@]} == 0 ];
then
echo "Couldn't find beam-sdks-java-bom in the generated java artifact."
echo "Please check ${REPO_URL} and try regenerating the java artifacts in the build_rc step."
exit 1
fi
FAILED=()
for i in "${artifacts[@]}"
do
curl "${REPO_URL}/org/apache/beam/${i}/${RELEASE_VER}" -f || FAILED+=($i)
sleep 0.5
done
if [ ${#FAILED[@]} != 0 ];
then
echo "Failed to find the following artifacts in the generated java artifact, but they were present as dependencies in beam-sdks-java-bom:"
for i in "${FAILED[@]}"
do
echo "Artifact: ${i} - url: ${REPO_URL}/org/apache/beam/${i}/${RELEASE_VER}"
done
echo "Please check ${REPO_URL} and try regenerating the java artifacts in the build_rc step."
exit 1
fi
fi
echo "----------------- Checking git -----------------"
if [[ -z ${GITHUB_TOKEN} ]]; then
echo "Error: A Github personal access token is required to perform git push "
echo "under a newly cloned directory. Please manually create one from Github "
echo "website with guide:"
echo "https://help.github.com/en/articles/creating-a-personal-access-token-for-the-command-line"
echo "Note: This token can be reused in other release scripts."
exit
else
if [[ -d ${LOCAL_BEAM_DIR} ]]; then
rm -rf ${LOCAL_BEAM_DIR}
fi
echo "* Creating local Beam workspace: ${LOCAL_BEAM_DIR}"
mkdir -p ${LOCAL_BEAM_DIR}
echo "* Cloning Beam repo"
git clone --branch ${RC_TAG} ${GIT_REPO_URL} ${LOCAL_BEAM_DIR}
cd ${LOCAL_BEAM_DIR}
git checkout -b ${WORKING_BRANCH} ${RC_TAG} --quiet
echo "* Setting up git config"
# Set upstream repo url with access token included.
USER_REPO_URL=https://${GITHUB_USERNAME}:${GITHUB_TOKEN}@github.com/${GITHUB_USERNAME}/beam.git
git remote add ${GITHUB_USERNAME} ${USER_REPO_URL}
# For hub access Github API.
export GITHUB_TOKEN=${GITHUB_TOKEN}
# For local git repo only. Required if global configs are not set.
git config user.name "${GITHUB_USERNAME}"
git config user.email "${GITHUB_USERNAME}@gmail.com"
fi
echo "-----------------Checking hub-----------------"
if [[ -z `which hub` ]]; then
if [[ "${INSTALL_HUB}" = true ]]; then
echo "-----------------Installing hub-----------------"
wget https://github.com/github/hub/releases/download/v${HUB_VERSION}/${HUB_ARTIFACTS_NAME}.tgz
tar zvxvf ${HUB_ARTIFACTS_NAME}.tgz
sudo ./${HUB_ARTIFACTS_NAME}/install
echo "eval "$(hub alias -s)"" >> ~/.bashrc
rm -rf ${HUB_ARTIFACTS_NAME}*
else
echo "Hub is not installed. Validation on Python Quickstart and MobileGame will be skipped."
fi
fi
hub version
echo "-----------------Checking Google Cloud SDK-----------------"
if [[ -z `which gcloud` ]]; then
if [[ "${INSTALL_GCLOUD}" = true ]]; then
echo "-----------------Installing Google Cloud SDK-----------------"
sudo apt-get install google-cloud-sdk
gcloud init
gcloud config set project ${USER_GCP_PROJECT}
gcloud config set compute/region ${USER_GCP_REGION}
echo "-----------------Setting Up Service Account-----------------"
if [[ ! -z "${USER_SERVICE_ACCOUNT_EMAIL}" ]]; then
SERVICE_ACCOUNT_KEY_JSON=~/google-cloud-sdk/${USER}_json_key.json
gcloud iam service-accounts keys create ${SERVICE_ACCOUNT_KEY_JSON} --iam-account ${USER_SERVICE_ACCOUNT_EMAIL}
export GOOGLE_APPLICATION_CREDENTIALS=${SERVICE_ACCOUNT_KEY_JSON}
else
echo "Missing USER_SERVICE_ACCOUNT_EMAIL from config file. Force terminate."
exit
fi
else
echo "Google Cloud SDK is not installed."
fi
fi
gcloud --version
echo "-----Initializing gcloud default and application-default credentials-----"
gcloud auth login
gcloud auth application-default login
echo "-----------------Checking gnome-terminal-----------------"
if [[ -z `which gnome-terminal` ]]; then
echo "You don't have gnome-terminal installed."
if [[ "$INSTALL_GNOME_TERMINAL" = true ]]; then
sudo apt-get install gnome-terminal
else
echo "gnome-terminal is not installed. Can't run validation on Python Leaderboard & GameStates. Exiting."
exit
fi
fi
gnome-terminal --version
echo "-----------------Checking kubectl-----------------"
if [[ -z `which kubectl` ]]; then
echo "You don't have kubectl installed."
if [[ "$INSTALL_KUBECTL" = true ]]; then
sudo apt-get install kubectl
else
echo "kubectl is not installed. Can't run validation on Python cross-language Kafka taxi. Exiting."
exit
fi
fi
kubectl version
if [[ ("$python_xlang_quickstart" = true) \
|| ("$java_xlang_quickstart" = true) ]]; then
echo "[Confirmation Required] Multi-language quickstart tests require generating
final Docker tags for several candidate Docker images. This should be safe to do
so since these tags will be overridden during the Docker image finalization step
of the Beam release.
Continue with the release validation ? [y|N]"
read confirmation
if [[ $confirmation != "y" ]]; then
echo "Cannot continue with the validation. Exiting."
exit
fi
fi
echo ""
echo "====================Starting Python Quickstart and MobileGame==================="
echo "This task will create a PR against apache/beam, trigger a jenkins job to run:"
echo "1. Python quickstart validations(batch & streaming)"
echo "2. Python MobileGame validations(UserScore, HourlyTeamScore)"
if [[ "$python_quickstart_mobile_game" = true && ! -z `which hub` ]]; then
touch empty_file.json
git add empty_file.json
git commit -m "Add empty file in order to create PR" --quiet
git push -f ${GITHUB_USERNAME} --quiet
# Create a test PR
PR_URL=$(hub pull-request -b apache:${RELEASE_BRANCH} -h apache:${RC_TAG} -F- <<<"[DO NOT MERGE] Run Python RC Validation Tests
Run Python ReleaseCandidate")
echo "Created $PR_URL"
# Comment on PR to trigger Python ReleaseCandidate Jenkins job.
PR_NUM=$(echo $PR_URL | sed 's/.*apache\/beam\/pull\/\([0-9]*\).*/\1/')
hub api repos/apache/beam/issues/$PR_NUM/comments --raw-field "body=Run Python ReleaseCandidate" > /dev/null
echo ""
echo "[NOTE] If there is no jenkins job started, please comment on $PR_URL with: Run Python ReleaseCandidate"
else
echo "* Skipping Python Quickstart and MobileGame. Hub is required."
fi
# TODO(https://github.com/apache/beam/issues/21193) Run the remaining tests on Jenkins.
echo ""
echo "====================Starting Python Leaderboard & GameStates Validations==============="
if [[ ("$python_leaderboard_direct" = true \
|| "$python_leaderboard_dataflow" = true \
|| "$python_gamestats_direct" = true \
|| "$python_gamestats_dataflow" = true) \
&& ! -z `which gnome-terminal` ]]; then
cd ${LOCAL_BEAM_DIR}
echo "---------------------Downloading Python Staging RC----------------------------"
wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.tar.gz
wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.tar.gz.sha512
if [[ ! -f apache-beam-${RELEASE_VER}.tar.gz ]]; then
{ echo "Fail to download Python Staging RC files." ;exit 1; }
fi
echo "--------------------------Verifying Hashes------------------------------------"
sha512sum -c apache-beam-${RELEASE_VER}.tar.gz.sha512
echo "--------------------------Updating ~/.m2/settings.xml-------------------------"
cd ~
if [[ ! -d .m2 ]]; then
mkdir .m2
fi
cd .m2
if [[ -f ~/.m2/settings.xml ]]; then
mv settings.xml $BACKUP_M2
fi
touch settings.xml
echo "<settings>" >> settings.xml
echo " <profiles>" >> settings.xml
echo " <profile>" >> settings.xml
echo " <id>release-repo</id>" >> settings.xml
echo " <activation>" >> settings.xml
echo " <activeByDefault>true</activeByDefault>" >> settings.xml
echo " </activation>" >> settings.xml
echo " <repositories>" >> settings.xml
echo " <repository>" >> settings.xml
echo " <id>Release ${RELEASE_VER} RC${RC_NUM}</id>" >> settings.xml
echo " <name>Release ${RELEASE_VER} RC${RC_NUM}</name>" >> settings.xml
echo " <url>${REPO_URL}</url>" >> settings.xml
echo " </repository>" >> settings.xml
echo " </repositories>" >> settings.xml
echo " </profile>" >> settings.xml
echo " </profiles>" >> settings.xml
echo "</settings>" >> settings.xml
echo "-----------------------Setting up Shell Env Vars------------------------------"
set_bashrc
echo "----------------------Starting Pubsub Java Injector--------------------------"
cd ${LOCAL_BEAM_DIR}
mvn archetype:generate \
-DarchetypeGroupId=org.apache.beam \
-DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
-DarchetypeVersion=${RELEASE_VER} \
-DgroupId=org.example \
-DartifactId=word-count-beam \
-Dversion="0.1" \
-Dpackage=org.apache.beam.examples \
-DinteractiveMode=false \
-DarchetypeCatalog=internal
# Create a pubsub topic as a input source shared to all Python pipelines.
SHARED_PUBSUB_TOPIC=leader_board-${USER}-python-topic-$(date +%m%d)_$RANDOM
gcloud pubsub topics create --project=${USER_GCP_PROJECT} ${SHARED_PUBSUB_TOPIC}
cd word-count-beam
echo "A new terminal will pop up and start a java top injector."
gnome-terminal -x sh -c \
"echo '******************************************************';
echo '* Running Pubsub Java Injector';
echo '******************************************************';
mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.complete.game.injector.Injector \
-Dexec.args='${USER_GCP_PROJECT} ${SHARED_PUBSUB_TOPIC} none';
exec bash"
# Run Leaderboard & GameStates pipelines under multiple versions of Python
cd ${LOCAL_BEAM_DIR}
for py_version in "${PYTHON_VERSIONS_TO_VALIDATE[@]}"
do
rm -rf ./beam_env_${py_version}
echo "--------------Setting up virtualenv with $py_version interpreter----------------"
$py_version -m venv beam_env_${py_version}
. ./beam_env_${py_version}/bin/activate
pip install --upgrade pip setuptools wheel
echo "--------------------------Installing Python SDK-------------------------------"
pip install apache-beam-${RELEASE_VER}.tar.gz[gcp]
echo "----------------Starting Leaderboard with DirectRunner-----------------------"
if [[ "$python_leaderboard_direct" = true ]]; then
LEADERBOARD_DIRECT_DATASET=${USER}_python_validations_$(date +%m%d)_$RANDOM
bq mk --project_id=${USER_GCP_PROJECT} ${LEADERBOARD_DIRECT_DATASET}
echo "export LEADERBOARD_DIRECT_DATASET=${LEADERBOARD_DIRECT_DATASET}" >> ~/.bashrc
echo "This is a streaming job. This task will be launched in a separate terminal."
gnome-terminal -x sh -c \
"echo '*****************************************************';
echo '* Running Python Leaderboard with DirectRunner';
echo '*****************************************************';
. ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
python -m apache_beam.examples.complete.game.leader_board \
--project=${USER_GCP_PROJECT} \
--topic projects/${USER_GCP_PROJECT}/topics/${SHARED_PUBSUB_TOPIC} \
--dataset ${LEADERBOARD_DIRECT_DATASET};
exec bash"
echo "***************************************************************"
echo "* Please wait for at least 5 mins to let results get populated."
echo "* Sleeping for 5 mins"
sleep 5m
echo "***************************************************************"
echo "* How to verify results:"
echo "* 1. Check whether there is any error messages in the task running terminal."
echo "* 2. Goto your BigQuery console and check whether your ${LEADERBOARD_DIRECT_DATASET} has leader_board_users and leader_board_teams table."
echo "* 3. Check whether leader_board_users has data, retrieving BigQuery data as below: "
bq head -n 10 ${LEADERBOARD_DIRECT_DATASET}.leader_board_users
echo "* 4. Check whether leader_board_teams has data, retrieving BigQuery data as below:"
bq head -n 10 ${LEADERBOARD_DIRECT_DATASET}.leader_board_teams
echo "***************************************************************"
else
echo "* Skipping Python Leaderboard with DirectRunner"
fi
echo "----------------Starting Leaderboard with DataflowRunner---------------------"
if [[ "$python_leaderboard_dataflow" = true ]]; then
LEADERBOARD_DF_DATASET=${USER}_python_validations_$(date +%m%d)_$RANDOM
bq mk --project_id=${USER_GCP_PROJECT} ${LEADERBOARD_DF_DATASET}
echo "export LEADERBOARD_DF_DATASET=${LEADERBOARD_DF_DATASET}" >> ~/.bashrc
echo "This is a streaming job. This task will be launched in a separate terminal."
gnome-terminal -x sh -c \
"echo '*****************************************************';
echo '* Running Python Leaderboard with DataflowRunner';
echo '*****************************************************';
. ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
python -m apache_beam.examples.complete.game.leader_board \
--project=${USER_GCP_PROJECT} \
--region=${USER_GCP_REGION} \
--topic projects/${USER_GCP_PROJECT}/topics/${SHARED_PUBSUB_TOPIC} \
--dataset ${LEADERBOARD_DF_DATASET} \
--runner DataflowRunner \
--temp_location=${USER_GCS_BUCKET}/temp/ \
--sdk_location apache-beam-${RELEASE_VER}.tar.gz; \
exec bash"
echo "***************************************************************"
echo "* Please wait for at least 10 mins to let Dataflow job be launched and results get populated."
echo "* Sleeping for 10 mins"
sleep 10m
echo "* How to verify results:"
echo "* 1. Goto your Dataflow job console and check whether there is any error."
echo "* 2. Goto your BigQuery console and check whether your ${LEADERBOARD_DF_DATASET} has leader_board_users and leader_board_teams table."
echo "* 3. Check whether leader_board_users has data, retrieving BigQuery data as below: "
bq head -n 10 ${LEADERBOARD_DF_DATASET}.leader_board_users
echo "* 4. Check whether leader_board_teams has data, retrieving BigQuery data as below:"
bq head -n 10 ${LEADERBOARD_DF_DATASET}.leader_board_teams
echo "***************************************************************"
else
echo "* Skipping Python Leaderboard with DataflowRunner"
fi
echo "------------------Starting GameStats with DirectRunner-----------------------"
if [[ "$python_gamestats_direct" = true ]]; then
GAMESTATS_DIRECT_DATASET=${USER}_python_validations_$(date +%m%d)_$RANDOM
bq mk --project_id=${USER_GCP_PROJECT} ${GAMESTATS_DIRECT_DATASET}
echo "export GAMESTATS_DIRECT_DATASET=${GAMESTATS_DIRECT_DATASET}" >> ~/.bashrc
echo "This is a streaming job. This task will be launched in a separate terminal."
echo "Streaming job is running with fixed_window_duration=${FIXED_WINDOW_DURATION}"
gnome-terminal -x sh -c \
"echo '*****************************************************';
echo '* Running GameStats with DirectRunner';
echo '*****************************************************';
. ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
python -m apache_beam.examples.complete.game.game_stats \
--project=${USER_GCP_PROJECT} \
--topic projects/${USER_GCP_PROJECT}/topics/${SHARED_PUBSUB_TOPIC} \
--dataset ${GAMESTATS_DIRECT_DATASET} \
--fixed_window_duration ${FIXED_WINDOW_DURATION}; \
exec bash"
echo "***************************************************************"
echo "* Please wait for at least 25 mins to let results get populated."
echo "* Sleeping for 25mins"
sleep 25m
echo "* How to verify results:"
echo "* 1. Check whether there is any error messages in the task running terminal."
echo "* 2. Goto your BigQuery console and check whether your ${GAMESTATS_DIRECT_DATASET} has game_stats_teams and game_stats_sessions table."
echo "* 3. Check whether game_stats_teams has data, retrieving BigQuery data as below: "
bq head -n 10 ${GAMESTATS_DIRECT_DATASET}.game_stats_teams
echo "* 4. Check whether game_stats_sessions has data, retrieving BigQuery data as below:"
bq head -n 10 ${GAMESTATS_DIRECT_DATASET}.game_stats_sessions
echo "***************************************************************"
else
echo "* Skipping Python GameStats with DirectRunner"
fi
echo "-------------------Starting GameStats with DataflowRunner--------------------"
if [[ "$python_gamestats_dataflow" = true ]]; then
GAMESTATS_DF_DATASET=${USER}_python_validations_$(date +%m%d)_$RANDOM
bq mk --project_id=${USER_GCP_PROJECT} ${GAMESTATS_DF_DATASET}
echo "export GAMESTATS_DF_DATASET=${GAMESTATS_DF_DATASET}" >> ~/.bashrc
echo "This is a streaming job. This task will be launched in a separate terminal."
echo "Streaming job is running with fixed_window_duration=${FIXED_WINDOW_DURATION}"
gnome-terminal -x sh -c \
"echo '*****************************************************';
echo '* Running GameStats with DataflowRunner';
echo '*****************************************************';
. ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
python -m apache_beam.examples.complete.game.game_stats \
--project=${USER_GCP_PROJECT} \
--region=${USER_GCP_REGION} \
--topic projects/${USER_GCP_PROJECT}/topics/${SHARED_PUBSUB_TOPIC} \
--dataset ${GAMESTATS_DF_DATASET} \
--runner DataflowRunner \
--temp_location=${USER_GCS_BUCKET}/temp/ \
--sdk_location apache-beam-${RELEASE_VER}.tar.gz \
--fixed_window_duration ${FIXED_WINDOW_DURATION}; exec bash"
echo "***************************************************************"
echo "* Please wait for at least 30 mins to let results get populated."
echo "* Sleeping for 30 mins"
sleep 30m
echo "* How to verify results:"
echo "* 1. Goto your Dataflow job console and check whether there is any error."
echo "* 2. Goto your BigQuery console and check whether your ${GAMESTATS_DF_DATASET} has game_stats_teams and game_stats_sessions table."
echo "* 3. Check whether game_stats_teams has data, retrieving BigQuery data as below: "
bq head -n 10 ${GAMESTATS_DF_DATASET}.game_stats_teams
echo "* 4. Check whether game_stats_sessions has data, retrieving BigQuery data as below:"
bq head -n 10 ${GAMESTATS_DF_DATASET}.game_stats_sessions
echo "***************************************************************"
else
echo "* Skipping Python GameStats with DataflowRunner"
fi
done # Loop over Python versions.
else
echo "* Skipping Python Leaderboard & GameStates Validations"
fi
# Setting up Docker images for multi-language quickstart tests.
if [[ ("$python_xlang_quickstart" = true) \
|| ("$java_xlang_quickstart" = true) ]]; then
echo ""
echo "====================Generating Docker tags for multi-language quickstart tests==============="
RC_DOCKER_TAG=${RELEASE_VER}rc${RC_NUM}
FINAL_DOCKER_TAG=${RELEASE_VER}
for py_version in "${PYTHON_VERSIONS_TO_VALIDATE[@]}"
do
PYTHON_DOCKER_IMAGE_REPO=apache/beam_${py_version}_sdk
PYTHON_RC_DOCKER_IMAGE=${PYTHON_DOCKER_IMAGE_REPO}:${RC_DOCKER_TAG}
PYTHON_FINAL_DOCKER_IMAGE=${PYTHON_DOCKER_IMAGE_REPO}:${FINAL_DOCKER_TAG}
docker pull ${PYTHON_DOCKER_IMAGE_REPO}:${RC_DOCKER_TAG}
echo "Creating Docker tag ${FINAL_DOCKER_TAG} from image ${PYTHON_RC_DOCKER_IMAGE}"
docker tag ${PYTHON_RC_DOCKER_IMAGE} ${PYTHON_FINAL_DOCKER_IMAGE}
done
JAVA_DOCKER_IMAGE_REPO=apache/beam_java11_sdk # Using the default Java version.
JAVA_RC_DOCKER_IMAGE=${JAVA_DOCKER_IMAGE_REPO}:${RC_DOCKER_TAG}
JAVA_FINAL_DOCKER_IMAGE=${JAVA_DOCKER_IMAGE_REPO}:${FINAL_DOCKER_TAG}
docker pull ${JAVA_DOCKER_IMAGE_REPO}:${RC_DOCKER_TAG}
echo "Creating Docker tag ${FINAL_DOCKER_TAG} from image ${JAVA_RC_DOCKER_IMAGE}"
docker tag ${JAVA_RC_DOCKER_IMAGE} ${JAVA_FINAL_DOCKER_IMAGE}
fi
echo ""
echo "====================Starting Python Multi-language Quickstart Validations==============="
if [[ ("$python_xlang_quickstart" = true) \
&& ! -z `which gnome-terminal` && ! -z `which kubectl` ]]; then
cd ${LOCAL_BEAM_DIR}
echo "---------------------Downloading Python Staging RC----------------------------"
wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.tar.gz
wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.tar.gz.sha512
if [[ ! -f apache-beam-${RELEASE_VER}.tar.gz ]]; then
{ echo "Failed to download Python Staging RC files." ;exit 1; }
fi
echo "--------------------------Verifying Hashes------------------------------------"
sha512sum -c apache-beam-${RELEASE_VER}.tar.gz.sha512
`which pip` install --upgrade pip
`which pip` install --upgrade setuptools
echo "-----------------------Setting up Shell Env Vars------------------------------"
set_bashrc
# Run Python Multi-language pipelines under multiple versions of Python using DirectRunner
cd ${LOCAL_BEAM_DIR}
for py_version in "${PYTHON_VERSIONS_TO_VALIDATE[@]}"
do
rm -rf ./beam_env_${py_version}
echo "--------------Setting up virtualenv with $py_version interpreter----------------"
$py_version -m venv beam_env_${py_version}
. ./beam_env_${py_version}/bin/activate
pip install --upgrade pip setuptools wheel
ln -s ${LOCAL_BEAM_DIR}/sdks beam_env_${py_version}/lib/sdks
echo "--------------------------Installing Python SDK-------------------------------"
pip install apache-beam-${RELEASE_VER}.tar.gz
echo '************************************************************';
echo '* Running Python Multi-language Quickstart with DirectRunner';
echo '************************************************************';
PYTHON_MULTILANG_QUICKSTART_FILE_PREFIX=python_multilang_quickstart
PYTHON_MULTILANG_QUICKSTART_INPUT_FILE_NAME=${PYTHON_MULTILANG_QUICKSTART_FILE_PREFIX}_input
PYTHON_MULTILANG_QUICKSTART_OUTPUT_FILE_NAME=${PYTHON_MULTILANG_QUICKSTART_FILE_PREFIX}_output
PYTHON_MULTILANG_QUICKSTART_EXPECTED_OUTPUT_FILE_NAME=${PYTHON_MULTILANG_QUICKSTART_FILE_PREFIX}_expected_output
PYTHON_MULTILANG_QUICKSTART_SORTED_OUTPUT_FILE_NAME=${PYTHON_MULTILANG_QUICKSTART_FILE_PREFIX}_sorted_output
# Cleaning up data from any previous runs.
rm ${PYTHON_MULTILANG_QUICKSTART_FILE_PREFIX}*
rm ./beam-examples-multi-language-${RELEASE_VER}.jar
# Generating an input file.
input_data=( aaa bbb ccc ddd eee)
touch $PYTHON_MULTILANG_QUICKSTART_INPUT_FILE_NAME
touch $PYTHON_MULTILANG_QUICKSTART_EXPECTED_OUTPUT_FILE_NAME
for item in ${input_data[*]}
do
echo $item >> $PYTHON_MULTILANG_QUICKSTART_INPUT_FILE_NAME
echo python:java:$item >> $PYTHON_MULTILANG_QUICKSTART_EXPECTED_OUTPUT_FILE_NAME
done
# Downloading the expansion service jar.
wget ${REPO_URL}/org/apache/beam/beam-examples-multi-language/${RELEASE_VER}/beam-examples-multi-language-${RELEASE_VER}.jar
JAVA_EXPANSION_SERVICE_PORT=33333
# Starting up the expansion service in a seperate shell.
echo "A new terminal will pop up and start a java expansion service."
gnome-terminal -x sh -c \
"echo '******************************************************';
echo '* Running Java expansion service in port ${JAVA_EXPANSION_SERVICE_PORT}';
echo '******************************************************';
java -jar ./beam-examples-multi-language-${RELEASE_VER}.jar ${JAVA_EXPANSION_SERVICE_PORT};
exec bash"
echo "Sleeping 10 seconds for the expansion service to start up."
sleep 10s
# Running the pipeline
python ${LOCAL_BEAM_DIR}/examples/multi-language/python/addprefix.py \
--runner DirectRunner \
--environment_type=DOCKER \
--input $PYTHON_MULTILANG_QUICKSTART_INPUT_FILE_NAME \
--output $PYTHON_MULTILANG_QUICKSTART_OUTPUT_FILE_NAME \
--expansion_service_port $JAVA_EXPANSION_SERVICE_PORT
# Validating output
cat ${PYTHON_MULTILANG_QUICKSTART_OUTPUT_FILE_NAME}* | sort >> ${PYTHON_MULTILANG_QUICKSTART_SORTED_OUTPUT_FILE_NAME}
if cmp --silent -- $PYTHON_MULTILANG_QUICKSTART_EXPECTED_OUTPUT_FILE_NAME $PYTHON_MULTILANG_QUICKSTART_SORTED_OUTPUT_FILE_NAME; then
echo "Successfully validated Python multi-language quickstart example. No additional manual validation needed."
else
echo "Python multi-language quickstart output validation failed. Since the output of the pipeline did not match the expected output"
echo "Expected output:\n"
cat $PYTHON_MULTILANG_QUICKSTART_EXPECTED_OUTPUT_FILE_NAME
echo "\n"
echo "Pipeline output:\n"
cat $PYTHON_MULTILANG_QUICKSTART_SORTED_OUTPUT_FILE_NAME
echo "\n"
exit 1
fi
done # Loop over Python versions.
else
echo "* Skipping Python Multi-language Quickstart Validations"
fi
echo ""
echo "====================Starting Java Multi-language Quickstart Validations==============="
if [[ ("$java_xlang_quickstart" = true) \
&& ! -z `which gnome-terminal` && ! -z `which kubectl` ]]; then
cd ${LOCAL_BEAM_DIR}
echo "---------------------Downloading Python Staging RC----------------------------"
wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.tar.gz
wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.tar.gz.sha512
if [[ ! -f apache-beam-${RELEASE_VER}.tar.gz ]]; then
{ echo "Failed to download Python Staging RC files." ;exit 1; }
fi
echo "--------------------------Verifying Hashes------------------------------------"
sha512sum -c apache-beam-${RELEASE_VER}.tar.gz.sha512
`which pip` install --upgrade pip
`which pip` install --upgrade setuptools
echo "-----------------------Setting up Shell Env Vars------------------------------"
set_bashrc
# Run Java Multi-language pipelines under multiple versions of Python using DirectRunner
cd ${LOCAL_BEAM_DIR}
for py_version in "${PYTHON_VERSIONS_TO_VALIDATE[@]}"
do
rm -rf ./beam_env_${py_version}
echo "--------------Setting up virtualenv with $py_version interpreter----------------"
$py_version -m venv beam_env_${py_version}
. ./beam_env_${py_version}/bin/activate
pip install --upgrade pip setuptools wheel
ln -s ${LOCAL_BEAM_DIR}/sdks beam_env_${py_version}/lib/sdks
echo "--------------------------Installing Python SDK-------------------------------"
pip install apache-beam-${RELEASE_VER}.tar.gz[dataframe]
# Deacrivating in the main shell. We will reactivate the virtual environment new shells
# for the expansion service and the job server.
deactivate
PYTHON_PORTABLE_RUNNER_JOB_SERVER_PORT=44443
PYTHON_EXPANSION_SERVICE_PORT=44444
# Starting up the Job Server
echo "A new terminal will pop up and start a Python PortableRunner job server."
gnome-terminal -x sh -c \
"echo '******************************************************';
echo '* Running Python PortableRunner in port ${PYTHON_PORTABLE_RUNNER_JOB_SERVER_PORT}';
echo '******************************************************';
. ./beam_env_${py_version}/bin/activate;
python -m apache_beam.runners.portability.local_job_service_main -p ${PYTHON_PORTABLE_RUNNER_JOB_SERVER_PORT};
exec bash"
# Starting up the Python expansion service
echo "A new terminal will pop up and start a Python expansion service."
gnome-terminal -x sh -c \
"echo '******************************************************';
echo '* Running Python Portabexpansion service in port ${PYTHON_EXPANSION_SERVICE_PORT}';
echo '******************************************************';
. ./beam_env_${py_version}/bin/activate;
python -m apache_beam.runners.portability.expansion_service_main --port=${PYTHON_EXPANSION_SERVICE_PORT} \
--fully_qualified_name_glob=* \
--pickle_library=cloudpickle;
exec bash"
echo "Sleeping 10 seconds for the job server and the expansion service to start up."
sleep 10s
echo '************************************************************';
echo '* Running Java Multi-language Quickstart with DirectRunner';
echo '************************************************************';
JAVA_MULTILANG_QUICKSTART_FILE_PREFIX=java_multilang_quickstart
JAVA_MULTILANG_QUICKSTART_OUTPUT_FILE_NAME=${JAVA_MULTILANG_QUICKSTART_FILE_PREFIX}_output
./gradlew :examples:multi-language:pythonDataframeWordCount -Pver=${RELEASE_VER} -Prepourl=${REPO_URL} --args=" \
--runner=PortableRunner \
--jobEndpoint=localhost:${PYTHON_PORTABLE_RUNNER_JOB_SERVER_PORT} \
--expansionService=localhost:${PYTHON_EXPANSION_SERVICE_PORT} \
--output=${JAVA_MULTILANG_QUICKSTART_OUTPUT_FILE_NAME}"
# We cannot validate local output since
# TODO: Write output to GCS and validate when Python portable runner can forward credentials to GCS appropriately.
java_xlang_quickstart_status=$?
if [[ $java_xlang_quickstart_status -eq 0 ]]; then
echo "Successfully completed Java multi-language quickstart example. No manual validation needed."
else
{ echo "Java multi-language quickstart failed since the pipeline execution failed." ;exit 1; }
fi
done # Loop over Python versions.
else
echo "* Skipping Java Multi-language Quickstart Validations"
fi
echo ""
echo "====================Starting Python Multi-language Validations with DataflowRunner==============="
if [[ ("$python_xlang_kafka_taxi_dataflow" = true
|| "$python_xlang_sql_taxi_dataflow" = true) \
&& ! -z `which gnome-terminal` && ! -z `which kubectl` ]]; then
cd ${LOCAL_BEAM_DIR}
echo "---------------------Downloading Python Staging RC----------------------------"
wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.tar.gz
wget ${PYTHON_RC_DOWNLOAD_URL}/${RELEASE_VER}/python/apache-beam-${RELEASE_VER}.tar.gz.sha512
if [[ ! -f apache-beam-${RELEASE_VER}.tar.gz ]]; then
{ echo "Fail to download Python Staging RC files." ;exit 1; }
fi
echo "--------------------------Verifying Hashes------------------------------------"
sha512sum -c apache-beam-${RELEASE_VER}.tar.gz.sha512
`which pip` install --upgrade pip
`which pip` install --upgrade setuptools
echo "-----------------------Setting up Shell Env Vars------------------------------"
set_bashrc
echo "-----------------------Setting up Kafka Cluster on GKE------------------------"
KAFKA_CLUSTER_NAME=xlang-kafka-cluster-$RANDOM
if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
gcloud container clusters create --project=${USER_GCP_PROJECT} --region=${USER_GCP_REGION} --no-enable-ip-alias $KAFKA_CLUSTER_NAME
kubectl apply -R -f ${LOCAL_BEAM_DIR}/.test-infra/kubernetes/kafka-cluster
echo "* Please wait for 10 mins to let a Kafka cluster be launched on GKE."
echo "* Sleeping for 10 mins"
sleep 10m
else
echo "* Skipping Kafka cluster setup"
fi
# Run Python multi-language pipelines under multiple versions of Python using Dataflow Runner
cd ${LOCAL_BEAM_DIR}
for py_version in "${PYTHON_VERSIONS_TO_VALIDATE[@]}"
do
rm -rf ./beam_env_${py_version}
echo "--------------Setting up virtualenv with $py_version interpreter----------------"
$py_version -m venv beam_env_${py_version}
. ./beam_env_${py_version}/bin/activate
pip install --upgrade pip setuptools wheel
ln -s ${LOCAL_BEAM_DIR}/sdks beam_env_${py_version}/lib/sdks
echo "--------------------------Installing Python SDK-------------------------------"
pip install apache-beam-${RELEASE_VER}.tar.gz[gcp]
echo "----------------Starting XLang Kafka Taxi with DataflowRunner---------------------"
if [[ "$python_xlang_kafka_taxi_dataflow" = true ]]; then
BOOTSTRAP_SERVERS="$(kubectl get svc outside-0 -o jsonpath='{.status.loadBalancer.ingress[0].ip}'):32400"
echo "BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}"
KAFKA_TAXI_DF_DATASET=${USER}_python_validations_$(date +%m%d)_$RANDOM
KAFKA_EXPANSION_SERVICE_JAR=${REPO_URL}/org/apache/beam/beam-sdks-java-io-expansion-service/${RELEASE_VER}/beam-sdks-java-io-expansion-service-${RELEASE_VER}.jar
bq mk --project_id=${USER_GCP_PROJECT} ${KAFKA_TAXI_DF_DATASET}
echo "export BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}" >> ~/.bashrc
echo "export KAFKA_TAXI_DF_DATASET=${KAFKA_TAXI_DF_DATASET}" >> ~/.bashrc
echo "This is a streaming job. This task will be launched in a separate terminal."
gnome-terminal -x sh -c \
"echo '*****************************************************';
echo '* Running Python XLang Kafka Taxi with DataflowRunner';
echo '*****************************************************';
. ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
python -m apache_beam.examples.kafkataxi.kafka_taxi \
--project=${USER_GCP_PROJECT} \
--region=${USER_GCP_REGION} \
--topic beam-runnerv2 \
--bootstrap_servers ${BOOTSTRAP_SERVERS} \
--bq_dataset ${KAFKA_TAXI_DF_DATASET} \
--runner DataflowRunner \
--num_workers 5 \
--temp_location=${USER_GCS_BUCKET}/temp/ \
--with_metadata \
--beam_services=\"{\\\"sdks:java:io:expansion-service:shadowJar\\\": \\\"${KAFKA_EXPANSION_SERVICE_JAR}\\\"}\" \
--sdk_location apache-beam-${RELEASE_VER}.tar.gz; \
exec bash"
echo "***************************************************************"
echo "* Please wait for at least 20 mins to let Dataflow job be launched and results get populated."
echo "* Sleeping for 20 mins"
sleep 20m
echo "* How to verify results:"
echo "* 1. Goto your Dataflow job console and check whether there is any error."
echo "* 2. Check whether ${KAFKA_TAXI_DF_DATASET}.xlang_kafka_taxi has data, retrieving BigQuery data as below: "
test_output=$(bq head -n 10 ${KAFKA_TAXI_DF_DATASET}.xlang_kafka_taxi)
echo "$test_output"
if ! grep -q "passenger_count" <<< "$test_output"; then
echo "Couldn't find expected output. Please confirm the output by visiting the console manually."
exit 1
fi
echo "***************************************************************"
else
echo "* Skipping Python XLang Kafka Taxi with DataflowRunner"
fi
echo "----------------Starting XLang SQL Taxi with DataflowRunner---------------------"
if [[ "$python_xlang_sql_taxi_dataflow" = true ]]; then
SQL_TAXI_TOPIC=${USER}_python_validations_$(date +%m%d)_$RANDOM
SQL_TAXI_SUBSCRIPTION=${USER}_python_validations_$(date +%m%d)_$RANDOM
SQL_EXPANSION_SERVICE_JAR=${REPO_URL}/org/apache/beam/beam-sdks-java-extensions-sql-expansion-service/${RELEASE_VER}/beam-sdks-java-extensions-sql-expansion-service-${RELEASE_VER}.jar
gcloud pubsub topics create --project=${USER_GCP_PROJECT} ${SQL_TAXI_TOPIC}
gcloud pubsub subscriptions create --project=${USER_GCP_PROJECT} --topic=${SQL_TAXI_TOPIC} ${SQL_TAXI_SUBSCRIPTION}
echo "export SQL_TAXI_TOPIC=${SQL_TAXI_TOPIC}" >> ~/.bashrc
echo "This is a streaming job. This task will be launched in a separate terminal."
gnome-terminal -x sh -c \
"echo '***************************************************';
echo '* Running Python XLang SQL Taxi with DataflowRunner';
echo '***************************************************';
. ${LOCAL_BEAM_DIR}/beam_env_${py_version}/bin/activate
python -m apache_beam.examples.sql_taxi \
--project=${USER_GCP_PROJECT} \
--region=${USER_GCP_REGION} \
--runner DataflowRunner \
--num_workers 5 \
--temp_location=${USER_GCS_BUCKET}/temp/ \
--output_topic projects/${USER_GCP_PROJECT}/topics/${SQL_TAXI_TOPIC} \
--beam_services=\"{\\\":sdks:java:extensions:sql:expansion-service:shadowJar\\\": \\\"${SQL_EXPANSION_SERVICE_JAR}\\\"}\" \
--sdk_location apache-beam-${RELEASE_VER}.tar.gz; \
exec bash"
echo "***************************************************************"
echo "* Please wait for at least 20 mins to let Dataflow job be launched and results get populated."
echo "* Sleeping for 20 mins"
sleep 20m
echo "* How to verify results:"
echo "* 1. Goto your Dataflow job console and check whether there is any error."
echo "* 2. Check whether your ${SQL_TAXI_SUBSCRIPTION} subscription has data below:"
# run twice since the first execution would return 0 messages
gcloud pubsub subscriptions pull --project=${USER_GCP_PROJECT} --limit=5 ${SQL_TAXI_SUBSCRIPTION}
test_output=$(gcloud pubsub subscriptions pull --project=${USER_GCP_PROJECT} --limit=5 ${SQL_TAXI_SUBSCRIPTION})
echo "$test_output"
if ! grep -q "ride_status" <<< "$test_output"; then
echo "Couldn't find expected output. Please confirm the output by visiting the console manually."
exit 1
fi
echo "***************************************************************"
else
echo "* Skipping Python XLang SQL Taxi with DataflowRunner"
fi
done # Loop over Python versions.
else
echo "* Skipping Python Multi-language Dataflow Validations"
fi
echo "*************************************************************"
echo " NOTE: Streaming pipelines are not automatically canceled. "
echo " Please manually cancel any remaining test pipelines and "
echo " clean up the resources (BigQuery dataset, PubSub topics and "
echo " subscriptions, GKE cluster, etc.) after verification. "
echo "*************************************************************"