#!/bin/bash
#
#    Licensed to the Apache Software Foundation (ASF) under one or more
#    contributor license agreements.  See the NOTICE file distributed with
#    this work for additional information regarding copyright ownership.
#    The ASF licenses this file to You under the Apache License, Version 2.0
#    (the "License"); you may not use this file except in compliance with
#    the License.  You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#


set -e
set -v

#######################################
# Print Separators.
# Arguments:
#   Info to be printed.
# Outputs:
#   Writes info to stdout.
#######################################
function print_separator() {
  echo "############################################################################"
  echo $1
  echo "############################################################################"
}


#######################################
# Update gcloud version.
# Arguments:
#   None
#######################################
function update_gcloud() {
  curl https://dl.google.com/dl/cloudsdk/channels/rapid/downloads/google-cloud-sdk-189.0.0-linux-x86_64.tar.gz \
  --output gcloud.tar.gz
  tar xf gcloud.tar.gz
  ./google-cloud-sdk/install.sh --quiet
  . ./google-cloud-sdk/path.bash.inc
  gcloud components update --quiet || echo 'gcloud components update failed'
  gcloud -v
}


#######################################
# Get Python SDK version from sdk/python/apache_beam/version.py.
# Arguments:
#   None
# Outputs:
#   Writes releasing version to stdout.
#   e.g. __version__ = '2.5.0' => 2.5.0
#   e.g. __version__ = '2.6.0.dev' => 2.5.0
#######################################
function get_version() {
  version=$(awk '/__version__/{print $3}' sdks/python/apache_beam/version.py)
  version=$(echo $version | cut -c 2- | rev | cut -c 2- | rev)
  if [[ $version = *".dev"* ]]; then
    version=$(echo $version | rev | cut -d'.' -f2- | rev)
    IFS='.' read -r -a array <<< "$version"
    minor_version=$((${array[1]}-1))
    version="${array[0]}.$minor_version.${array[2]}"
  fi
  echo $version
}


#######################################
# Download files including SDK, SHA512 and ASC.
# Globals:
#   BEAM_PYTHON_SDK*
# Arguments:
#   $1 - SDK type: tar, wheel
#   $2 - python interpreter version: python2.7, python3.5, ...
#######################################
function download_files() {
  if [[ $1 = *"wheel"* ]]; then
    if [[ $2 == "python2.7" ]]; then
      BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp27-cp27mu-manylinux1_x86_64.whl"
    elif [[ $2 == "python3.5" ]]; then
      BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp35-cp35m-manylinux1_x86_64.whl"
    elif [[ $2 == "python3.6" ]]; then
      BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp36-cp36m-manylinux1_x86_64.whl"
    elif [[ $2 == "python3.7" ]]; then
      BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp37-cp37m-manylinux1_x86_64.whl"
    elif [[ $2 == "python3.8" ]]; then
      BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp38-cp38m-manylinux1_x86_64.whl"
    elif [[ $2 == "python3.9" ]]; then
      BEAM_PYTHON_SDK_WHL="apache_beam-$VERSION*-cp39-cp39m-manylinux1_x86_64.whl"
    else
      echo "Unable to determine a Beam wheel for interpreter version $2."
      exit 1
    fi

    wget -r -l2 --no-parent -nd -A "$BEAM_PYTHON_SDK_WHL*" $RC_STAGING_URL
  else
    BEAM_PYTHON_SDK_ZIP="apache-beam-$VERSION.zip"
    wget -r -l2 --no-parent -nd -A "$BEAM_PYTHON_SDK_ZIP*" $RC_STAGING_URL
  fi
}


#######################################
# Stdout python sdk name.
# Globals:
#   BEAM_PYTHON_SDK_ZIP
# Arguments:
#   $1 - SDK type: tar, wheel
#######################################
function get_sdk_name() {
  sdk_name=$BEAM_PYTHON_SDK_ZIP
  if [[ $1 = *"wheel"* ]]; then
    sdk_name=$(ls | grep "/*.whl$")
  fi
  echo $sdk_name
}


#######################################
# Stdout sha512 file name.
# Arguments:
#   $1 - SDK type: tar, wheel
#######################################
function get_sha512_name() {
  if [[ $1 = *"wheel"* ]]; then
    echo $(ls | grep "/*.whl.sha512$")
  else
    echo $(ls | grep "/*.zip.sha512$")
  fi
}


#######################################
# Stdout ASC file name.
# Arguments:
#   $1 - SDK type: tar, wheel
#######################################
function get_asc_name() {
  if [[ $1 = *"wheel"* ]]; then
    echo $(ls | grep "/*.whl.asc$")
  else
    echo $(ls | grep "/*.zip.asc$")
  fi
}


#######################################
# Create a new virtualenv and install the SDK
# Globals:
#   BEAM_PYTHON_SDK
# Arguments:
#   $1 - SDK type: tar, wheel
#   $2 - python interpreter version: [python2.7, python3.5, ...]
#######################################
function install_sdk() {
  sdk_file=$(get_sdk_name $1)
  print_separator "Creating new virtualenv with $2 interpreter and installing the SDK from $sdk_file."
  gsutil version -l
  rm -rf ./temp_virtualenv_${2}
  virtualenv temp_virtualenv_${2} -p $2
  . temp_virtualenv_${2}/bin/activate
  gcloud_version=$(gcloud --version | head -1 | awk '{print $4}')
  if [[ "$gcloud_version" < "189" ]]; then
    update_gcloud
  fi
  pip install google-compute-engine
  pip install $sdk_file[gcp]
}


#######################################
# Publish data to Pubsub topic for streaming wordcount examples.
# Arguments:
#   None
#######################################
function run_pubsub_publish(){
  words=("hello world!", "I like cats!", "Python", "hello Python", "hello Python")
  for word in ${words[@]}; do
    gcloud pubsub topics publish $PUBSUB_TOPIC1 --message "$word"
  done
  sleep 10
}

#######################################
# Pull data from Pubsub.
# Arguments:
#   None
#######################################
function run_pubsub_pull() {
  gcloud pubsub subscriptions pull --project=$PROJECT_ID $PUBSUB_SUBSCRIPTION --limit=100 --auto-ack
}


#######################################
# Create Pubsub topics and subscription.
# Arguments:
#   None
#######################################
function create_pubsub() {
  gcloud pubsub topics create --project=$PROJECT_ID $PUBSUB_TOPIC1
  gcloud pubsub topics create --project=$PROJECT_ID $PUBSUB_TOPIC2
  gcloud pubsub subscriptions create --project=$PROJECT_ID $PUBSUB_SUBSCRIPTION --topic $PUBSUB_TOPIC2
}


#######################################
# Remove Pubsub topics and subscription.
# Arguments:
#   None
#######################################
function cleanup_pubsub() {
  # Suppress error and pass quietly if topic/subscription not exists. We don't want the script
  # to be interrupted in this case.
  gcloud pubsub topics delete --project=$PROJECT_ID $PUBSUB_TOPIC1 2> /dev/null || true
  gcloud pubsub topics delete --project=$PROJECT_ID $PUBSUB_TOPIC2 2> /dev/null || true
  gcloud pubsub subscriptions delete --project=$PROJECT_ID $PUBSUB_SUBSCRIPTION 2> /dev/null || true
}


#######################################
# Verify results of streaming_wordcount.
# Arguments:
#   $1 - runner type: DirectRunner, DataflowRunner
#   $2 - pid: the pid of running pipeline
#   $3 - running_job (DataflowRunner only): the job id of streaming pipeline running on DataflowRunner
#######################################
function verify_steaming_result() {
  retry=3
  should_see="Python: "
  while(( $retry > 0 )); do
    pull_result=$(run_pubsub_pull)
    if [[ $pull_result = *"$should_see"* ]]; then
      echo "SUCCEED: The streaming wordcount example running successfully on $1."
      break
    else
      if [[ $retry > 0 ]]; then
        retry=$(($retry-1))
        echo "retry left: $retry"
        sleep 15
      else
        echo "ERROR: The streaming wordcount example failed on $1."
        cleanup_pubsub
        kill -9 $2
        if [[ $1 = "DataflowRunner" ]]; then
          gcloud dataflow jobs cancel $3
        fi
        complete "failed when running streaming wordcount example with $1."
        exit 1
      fi
    fi
  done
}


#######################################
# Verify results of user_score.
# Globals:
#   BUCKET_NAME
# Arguments:
#   $1: Runner - direct, dataflow
#######################################
function verify_user_score() {
  expected_output_file_name="$USERSCORE_OUTPUT_PREFIX-$1-runner.txt"
  actual_output_files=$(ls)
  if [[ $1 = *"dataflow"* ]]; then
    actual_output_files=$(gsutil ls gs://$BUCKET_NAME)
    expected_output_file_name="gs://$BUCKET_NAME/$expected_output_file_name"
  fi
  echo $actual_output_files
  if [[ $actual_output_files != *$expected_output_file_name* ]]
  then
    echo "ERROR: The userscore example failed on $1-runner".
    complete "failed when running userscore example with $1-runner."
    exit 1
  fi

  if [[ $1 = *"dataflow"* ]]; then
    gsutil rm $expected_output_file_name*
  fi
  echo "SUCCEED: user_score successfully run on $1-runner."
}


#######################################
# Verify results of hourly_team_score.
# Globals:
#   DATASET
# Arguments:
#   Runner - direct, dataflow
#######################################
function verify_hourly_team_score() {
  retry=3
  should_see='AntiqueBrassPlatypus'
  while(( $retry >= 0 )); do
    if [[ $retry > 0 ]]; then
      bq_pull_result=$(bq head -n 500 $DATASET.hourly_team_score_python_$1)
      if [[ $bq_pull_result = *"$should_see"* ]]; then
        echo "SUCCEED: hourly_team_score example successful run on $1-runner"
        break
      else
        retry=$(($retry-1))
        echo "Did not find team scores, retry left: $retry"
        sleep 15
      fi
    else
      echo "FAILED: HourlyTeamScore example failed running on $1-runner. \
        Did not found scores of team $should_see in $DATASET.leader_board"
      complete "FAILED"
      exit 1
    fi
  done
}


# Python RC configurations
VERSION=$(get_version)
RC_STAGING_URL="https://dist.apache.org/repos/dist/dev/beam/$VERSION/python"

# Cloud Configurations
PROJECT_ID='apache-beam-testing'
BUCKET_NAME='temp-storage-for-release-validation-tests/nightly-snapshot-validation'
TEMP_DIR='/tmp'
DATASET='beam_postrelease_mobile_gaming'
NUM_WORKERS=1

WORDCOUNT_OUTPUT='wordcount_direct.txt'
PUBSUB_TOPIC1='wordstream-python-topic-1'
PUBSUB_TOPIC2='wordstream-python-topic-2'
PUBSUB_SUBSCRIPTION='wordstream-python-sub2'

# Mobile Gaming Configurations
DATASET='beam_postrelease_mobile_gaming'
USERSCORE_OUTPUT_PREFIX='python-userscore_result'
GAME_INPUT_DATA='gs://dataflow-samples/game/5000_gaming_data.csv'
