blob: 09847a81da317c88db48a1322269c6b145a8f8b9 [file] [log] [blame]
#!/bin/bash
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
set -e
set -v
#######################################
# Print Separators.
# Arguments:
# Info to be printed.
# Outputs:
# Writes info to stdout.
#######################################
function print_separator() {
echo "############################################################################"
echo $1
echo "############################################################################"
}
#######################################
# Update gcloud version.
# Arguments:
# None
#######################################
function update_gcloud() {
curl https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-189.0.0-linux-x86_64.tar.gz \
--output gcloud.tar.gz
tar xf gcloud.tar.gz
./google-cloud-sdk/install.sh --quiet
. ./google-cloud-sdk/path.bash.inc
gcloud components update --quiet || echo 'gcloud components update failed'
gcloud -v
}
#######################################
# Get Python SDK version from sdk/python/apache_beam/version.py.
# Arguments:
# None
# Outputs:
# Writes releasing version to stdout.
# e.g. __version__ = '2.5.0' => 2.5.0
# e.g. __version__ = '2.6.0.dev' => 2.5.0
#######################################
function get_version() {
version=$(awk '/__version__/{print $3}' sdks/python/apache_beam/version.py)
version=$(echo $version | cut -c 2- | rev | cut -c 2- | rev)
if [[ $version = *".dev"* ]]; then
version=$(echo $version | rev | cut -d'.' -f2- | rev)
IFS='.' read -r -a array <<< "$version"
minor_version=$((${array[1]}-1))
version="${array[0]}.$minor_version.${array[2]}"
fi
echo $version
}
#######################################
# Download files including SDK, SHA512 and ASC.
# Globals:
# BEAM_PYTHON_SDK*
# Arguments:
# $1 - SDK type: tar, wheel
# $2 - python interpreter version: python2.7, python3.5, ...
#######################################
function download_files() {
if [[ $1 = *"wheel"* ]]; then
if [[ $2 == "python2.7" ]]; then
BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp27-cp27mu-manylinux1_x86_64.whl"
elif [[ $2 == "python3.5" ]]; then
BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp35-cp35m-manylinux1_x86_64.whl"
elif [[ $2 == "python3.6" ]]; then
BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp36-cp36m-manylinux1_x86_64.whl"
elif [[ $2 == "python3.7" ]]; then
BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp37-cp37m-manylinux1_x86_64.whl"
elif [[ $2 == "python3.8" ]]; then
BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp38-cp38m-manylinux1_x86_64.whl"
elif [[ $2 == "python3.9" ]]; then
BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp39-cp39m-manylinux1_x86_64.whl"
else
echo "Unable to determine a Beam wheel for interpreter version $2."
exit 1
fi
wget -r -l2 --no-parent -nd -A "$BEAM_PYTHON_SDK_WHL*" $RC_STAGING_URL
else
BEAM_PYTHON_SDK_ZIP="apache-beam-$VERSION.zip"
wget -r -l2 --no-parent -nd -A "$BEAM_PYTHON_SDK_ZIP*" $RC_STAGING_URL
fi
}
#######################################
# Stdout python sdk name.
# Globals:
# BEAM_PYTHON_SDK_ZIP
# Arguments:
# $1 - SDK type: tar, wheel
#######################################
function get_sdk_name() {
sdk_name=$BEAM_PYTHON_SDK_ZIP
if [[ $1 = *"wheel"* ]]; then
sdk_name=$(ls | grep "/*.whl$")
fi
echo $sdk_name
}
#######################################
# Stdout sha512 file name.
# Arguments:
# $1 - SDK type: tar, wheel
#######################################
function get_sha512_name() {
if [[ $1 = *"wheel"* ]]; then
echo $(ls | grep "/*.whl.sha512$")
else
echo $(ls | grep "/*.zip.sha512$")
fi
}
#######################################
# Stdout ASC file name.
# Arguments:
# $1 - SDK type: tar, wheel
#######################################
function get_asc_name() {
if [[ $1 = *"wheel"* ]]; then
echo $(ls | grep "/*.whl.asc$")
else
echo $(ls | grep "/*.zip.asc$")
fi
}
#######################################
# Create a new virtualenv and install the SDK
# Globals:
# BEAM_PYTHON_SDK
# Arguments:
# $1 - SDK type: tar, wheel
# $2 - python interpreter version: [python2.7, python3.5, ...]
#######################################
function install_sdk() {
sdk_file=$(get_sdk_name $1)
print_separator "Creating new virtualenv with $2 interpreter and installing the SDK from $sdk_file."
gsutil version -l
rm -rf ./temp_virtualenv_${2}
virtualenv temp_virtualenv_${2} -p $2
. temp_virtualenv_${2}/bin/activate
gcloud_version=$(gcloud --version | head -1 | awk '{print $4}')
if [[ "$gcloud_version" < "189" ]]; then
update_gcloud
fi
pip install google-compute-engine
pip install $sdk_file[gcp]
}
#######################################
# Publish data to Pubsub topic for streaming wordcount examples.
# Arguments:
# None
#######################################
function run_pubsub_publish(){
words=("hello world!", "I like cats!", "Python", "hello Python", "hello Python")
for word in ${words[@]}; do
gcloud pubsub topics publish $PUBSUB_TOPIC1 --message "$word"
done
sleep 10
}
#######################################
# Pull data from Pubsub.
# Arguments:
# None
#######################################
function run_pubsub_pull() {
gcloud pubsub subscriptions pull --project=$PROJECT_ID $PUBSUB_SUBSCRIPTION --limit=100 --auto-ack
}
#######################################
# Create Pubsub topics and subscription.
# Arguments:
# None
#######################################
function create_pubsub() {
gcloud pubsub topics create --project=$PROJECT_ID $PUBSUB_TOPIC1
gcloud pubsub topics create --project=$PROJECT_ID $PUBSUB_TOPIC2
gcloud pubsub subscriptions create --project=$PROJECT_ID $PUBSUB_SUBSCRIPTION --topic $PUBSUB_TOPIC2
}
#######################################
# Remove Pubsub topics and subscription.
# Arguments:
# None
#######################################
function cleanup_pubsub() {
# Suppress error since topic/subscription may not exist
gcloud pubsub topics delete --project=$PROJECT_ID $PUBSUB_TOPIC1 2> /dev/null
gcloud pubsub topics delete --project=$PROJECT_ID $PUBSUB_TOPIC2 2> /dev/null
gcloud pubsub subscriptions delete --project=$PROJECT_ID $PUBSUB_SUBSCRIPTION 2> /dev/null
}
#######################################
# Verify results of streaming_wordcount.
# Arguments:
# $1 - runner type: DirectRunner, DataflowRunner
# $2 - pid: the pid of running pipeline
# $3 - running_job (DataflowRunner only): the job id of streaming pipeline running on DataflowRunner
#######################################
function verify_steaming_result() {
retry=3
should_see="Python: "
while(( $retry > 0 )); do
pull_result=$(run_pubsub_pull)
if [[ $pull_result = *"$should_see"* ]]; then
echo "SUCCEED: The streaming wordcount example running successfully on $1."
break
else
if [[ $retry > 0 ]]; then
retry=$(($retry-1))
echo "retry left: $retry"
sleep 15
else
echo "ERROR: The streaming wordcount example failed on $1."
cleanup_pubsub
kill -9 $2
if [[ $1 = "DataflowRunner" ]]; then
gcloud dataflow jobs cancel $3
fi
complete "failed when running streaming wordcount example with $1."
exit 1
fi
fi
done
}
#######################################
# Verify results of user_score.
# Globals:
# BUCKET_NAME
# Arguments:
# $1: Runner - direct, dataflow
#######################################
function verify_user_score() {
expected_output_file_name="$USERSCORE_OUTPUT_PREFIX-$1-runner.txt"
actual_output_files=$(ls)
if [[ $1 = *"dataflow"* ]]; then
actual_output_files=$(gsutil ls gs://$BUCKET_NAME)
expected_output_file_name="gs://$BUCKET_NAME/$expected_output_file_name"
fi
echo $actual_output_files
if [[ $actual_output_files != *$expected_output_file_name* ]]
then
echo "ERROR: The userscore example failed on $1-runner".
complete "failed when running userscore example with $1-runner."
exit 1
fi
if [[ $1 = *"dataflow"* ]]; then
gsutil rm $expected_output_file_name*
fi
echo "SUCCEED: user_score successfully run on $1-runner."
}
#######################################
# Verify results of hourly_team_score.
# Globals:
# DATASET
# Arguments:
# Runner - direct, dataflow
#######################################
function verify_hourly_team_score() {
retry=3
should_see='AntiqueBrassPlatypus'
while(( $retry >= 0 )); do
if [[ $retry > 0 ]]; then
bq_pull_result=$(bq head -n 500 $DATASET.hourly_team_score_python_$1)
if [[ $bq_pull_result = *"$should_see"* ]]; then
echo "SUCCEED: hourly_team_score example successful run on $1-runner"
break
else
retry=$(($retry-1))
echo "Did not find team scores, retry left: $retry"
sleep 15
fi
else
echo "FAILED: HourlyTeamScore example failed running on $1-runner. \
Did not found scores of team $should_see in $DATASET.leader_board"
complete "FAILED"
exit 1
fi
done
}
# Python RC configurations
VERSION=$(get_version)
RC_STAGING_URL="https://dist.apache.org/repos/dist/dev/beam/$VERSION/python"
# Cloud Configurations
PROJECT_ID='apache-beam-testing'
BUCKET_NAME='temp-storage-for-release-validation-tests/nightly-snapshot-validation'
TEMP_DIR='/tmp'
DATASET='beam_postrelease_mobile_gaming'
NUM_WORKERS=1
WORDCOUNT_OUTPUT='wordcount_direct.txt'
PUBSUB_TOPIC1='wordstream-python-topic-1'
PUBSUB_TOPIC2='wordstream-python-topic-2'
PUBSUB_SUBSCRIPTION='wordstream-python-sub2'
# Mobile Gaming Configurations
DATASET='beam_postrelease_mobile_gaming'
USERSCORE_OUTPUT_PREFIX='python-userscore_result'
GAME_INPUT_DATA='gs://dataflow-samples/game/5000_gaming_data.csv'