blob: 3498b56566c7fc93735206710c81b207dccb4e61 [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
set -o pipefail
if [[ -z $FLINK_DIR ]]; then
echo "FLINK_DIR needs to point to a Flink distribution directory"
exit 1
fi
export EXIT_CODE=0
echo "Flink dist directory: $FLINK_DIR"
TEST_ROOT=`pwd`
TEST_INFRA_DIR="$0"
TEST_INFRA_DIR=`dirname "$TEST_INFRA_DIR"`
cd $TEST_INFRA_DIR
TEST_INFRA_DIR=`pwd`
cd $TEST_ROOT
# used to randomize created directories
export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
echo "TEST_DATA_DIR: $TEST_DATA_DIR"
function backup_config() {
# back up the masters and flink-conf.yaml
cp $FLINK_DIR/conf/masters $FLINK_DIR/conf/masters.bak
cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak
}
function revert_default_config() {
# revert our modifications to the masters file
if [ -f $FLINK_DIR/conf/masters.bak ]; then
mv -f $FLINK_DIR/conf/masters.bak $FLINK_DIR/conf/masters
fi
# revert our modifications to the Flink conf yaml
if [ -f $FLINK_DIR/conf/flink-conf.yaml.bak ]; then
mv -f $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml
fi
}
function set_conf() {
CONF_NAME=$1
VAL=$2
echo "$CONF_NAME: $VAL" >> $FLINK_DIR/conf/flink-conf.yaml
}
function change_conf() {
CONF_NAME=$1
OLD_VAL=$2
NEW_VAL=$3
sed -i -e "s/${CONF_NAME}: ${OLD_VAL}/${CONF_NAME}: ${NEW_VAL}/" ${FLINK_DIR}/conf/flink-conf.yaml
}
function create_ha_config() {
backup_config
# clean up the dir that will be used for zookeeper storage
# (see high-availability.zookeeper.storageDir below)
if [ -e $TEST_DATA_DIR/recovery ]; then
echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
rm -rf $TEST_DATA_DIR/recovery
fi
# create the masters file (only one currently).
# This must have all the masters to be used in HA.
echo "localhost:8081" > ${FLINK_DIR}/conf/masters
# then move on to create the flink-conf.yaml
sed 's/^ //g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
#==============================================================================
# Common
#==============================================================================
jobmanager.rpc.address: localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 1024
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
#==============================================================================
# High Availability
#==============================================================================
high-availability: zookeeper
high-availability.zookeeper.storageDir: file://${TEST_DATA_DIR}/recovery/
high-availability.zookeeper.quorum: localhost:2181
high-availability.zookeeper.path.root: /flink
high-availability.cluster-id: /test_cluster_one
#==============================================================================
# Web Frontend
#==============================================================================
rest.port: 8081
EOL
}
function start_ha_cluster {
create_ha_config
start_local_zk
start_cluster
}
function start_local_zk {
# Parses the zoo.cfg and starts locally zk.
# This is almost the same code as the
# /bin/start-zookeeper-quorum.sh without the SSH part and only running for localhost.
while read server ; do
server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 's/[[:space:]]*$//') # trim
# match server.id=address[:port[:port]]
if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: \#]+) ]]; then
id=${BASH_REMATCH[1]}
address=${BASH_REMATCH[2]}
if [ "${address}" != "localhost" ]; then
echo "[ERROR] Parse error. Only available for localhost."
exit 1
fi
${FLINK_DIR}/bin/zookeeper.sh start $id
else
echo "[WARN] Parse error. Skipping config entry '$server'."
fi
done < <(grep "^server\." "${FLINK_DIR}/conf/zoo.cfg")
}
function start_cluster {
"$FLINK_DIR"/bin/start-cluster.sh
# wait at most 10 seconds until the dispatcher is up
for i in {1..10}; do
# without the || true this would exit our script if the JobManager is not yet up
QUERY_RESULT=$(curl "http://localhost:8081/taskmanagers" 2> /dev/null || true)
# ensure the taskmanagers field is there at all and is not empty
if [[ ${QUERY_RESULT} =~ \{\"taskmanagers\":\[.+\]\} ]]; then
echo "Dispatcher REST endpoint is up."
break
fi
echo "Waiting for dispatcher REST endpoint to come up..."
sleep 1
done
}
function check_logs_for_errors {
if grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
| grep -v "RetriableCommitFailedException" \
| grep -v "NoAvailableBrokersException" \
| grep -v "Async Kafka commit failed" \
| grep -v "DisconnectException" \
| grep -v "AskTimeoutException" \
| grep -v "WARN akka.remote.transport.netty.NettyTransport" \
| grep -v "WARN org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
| grep -v "jvm-exit-on-fatal-error" \
| grep -v '^INFO:.*AWSErrorCode=\[400 Bad Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]' \
| grep -v "RejectedExecutionException" \
| grep -v "An exception was thrown by an exception handler" \
| grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException" \
| grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration" \
| grep -iq "error"; then
echo "Found error in log files:"
cat $FLINK_DIR/log/*
EXIT_CODE=1
fi
}
function check_logs_for_exceptions {
if grep -rv "GroupCoordinatorNotAvailableException" $FLINK_DIR/log \
| grep -v "RetriableCommitFailedException" \
| grep -v "NoAvailableBrokersException" \
| grep -v "Async Kafka commit failed" \
| grep -v "DisconnectException" \
| grep -v "AskTimeoutException" \
| grep -v "WARN akka.remote.transport.netty.NettyTransport" \
| grep -v "WARN org.apache.flink.shaded.akka.org.jboss.netty.channel.DefaultChannelPipeline" \
| grep -v '^INFO:.*AWSErrorCode=\[400 Bad Request\].*ServiceEndpoint=\[https://.*\.s3\.amazonaws\.com\].*RequestType=\[HeadBucketRequest\]' \
| grep -v "RejectedExecutionException" \
| grep -v "An exception was thrown by an exception handler" \
| grep -v "Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.yarn.exceptions.YarnException" \
| grep -v "Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.conf.Configuration" \
| grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/yarn/exceptions/YarnException" \
| grep -v "java.lang.NoClassDefFoundError: org/apache/hadoop/conf/Configuration" \
| grep -v "java.lang.Exception: Execution was suspended" \
| grep -v "Caused by: java.lang.Exception: JobManager is shutting down" \
| grep -iq "exception"; then
echo "Found exception in log files:"
cat $FLINK_DIR/log/*
EXIT_CODE=1
fi
}
function check_logs_for_non_empty_out_files {
if grep -ri "." $FLINK_DIR/log/*.out > /dev/null; then
echo "Found non-empty .out files:"
cat $FLINK_DIR/log/*.out
EXIT_CODE=1
fi
}
function stop_cluster {
"$FLINK_DIR"/bin/stop-cluster.sh
# stop zookeeper only if there are processes running
if ! [ "`jps | grep 'FlinkZooKeeperQuorumPeer' | wc -l`" = "0" ]; then
"$FLINK_DIR"/bin/zookeeper.sh stop
fi
}
function wait_job_running {
for i in {1..10}; do
JOB_LIST_RESULT=$("$FLINK_DIR"/bin/flink list | grep "$1")
if [[ "$JOB_LIST_RESULT" == "" ]]; then
echo "Job ($1) is not yet running."
else
echo "Job ($1) is running."
break
fi
sleep 1
done
}
function wait_job_terminal_state {
local job=$1
local terminal_state=$2
echo "Waiting for job ($job) to reach terminal state $terminal_state ..."
while : ; do
N=$(grep -o "Job $job reached globally terminal state $terminal_state" $FLINK_DIR/log/*standalonesession*.log | tail -1)
if [[ -z $N ]]; then
sleep 1
else
break
fi
done
}
function take_savepoint {
"$FLINK_DIR"/bin/flink savepoint $1 $2
}
function cancel_job {
"$FLINK_DIR"/bin/flink cancel $1
}
function check_result_hash {
local name=$1
local outfile_prefix=$2
local expected=$3
local actual
if [ "`command -v md5`" != "" ]; then
actual=$(LC_ALL=C sort $outfile_prefix* | md5 -q)
elif [ "`command -v md5sum`" != "" ]; then
actual=$(LC_ALL=C sort $outfile_prefix* | md5sum | awk '{print $1}')
else
echo "Neither 'md5' nor 'md5sum' binary available."
exit 2
fi
if [[ "$actual" != "$expected" ]]
then
echo "FAIL $name: Output hash mismatch. Got $actual, expected $expected."
echo "head hexdump of actual:"
head $outfile_prefix* | hexdump -c
exit 1
else
echo "pass $name"
# Output files are left behind in /tmp
fi
}
function s3_put {
local_file=$1
bucket=$2
s3_file=$3
resource="/${bucket}/${s3_file}"
contentType="application/octet-stream"
dateValue=`date -R`
stringToSign="PUT\n\n${contentType}\n${dateValue}\n${resource}"
s3Key=$ARTIFACTS_AWS_ACCESS_KEY
s3Secret=$ARTIFACTS_AWS_SECRET_KEY
signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
curl -X PUT -T "${local_file}" \
-H "Host: ${bucket}.s3.amazonaws.com" \
-H "Date: ${dateValue}" \
-H "Content-Type: ${contentType}" \
-H "Authorization: AWS ${s3Key}:${signature}" \
https://${bucket}.s3.amazonaws.com/${s3_file}
}
function s3_delete {
bucket=$1
s3_file=$2
resource="/${bucket}/${s3_file}"
contentType="application/octet-stream"
dateValue=`date -R`
stringToSign="DELETE\n\n${contentType}\n${dateValue}\n${resource}"
s3Key=$ARTIFACTS_AWS_ACCESS_KEY
s3Secret=$ARTIFACTS_AWS_SECRET_KEY
signature=`echo -en ${stringToSign} | openssl sha1 -hmac ${s3Secret} -binary | base64`
curl -X DELETE \
-H "Host: ${bucket}.s3.amazonaws.com" \
-H "Date: ${dateValue}" \
-H "Content-Type: ${contentType}" \
-H "Authorization: AWS ${s3Key}:${signature}" \
https://${bucket}.s3.amazonaws.com/${s3_file}
}
# This function starts the given number of task managers and monitors their processes. If a task manager process goes
# away a replacement is started.
function tm_watchdog {
local expectedTm=$1
while true;
do
runningTm=`jps | grep -Eo 'TaskManagerRunner|TaskManager' | wc -l`;
count=$((expectedTm-runningTm))
for (( c=0; c<count; c++ ))
do
$FLINK_DIR/bin/taskmanager.sh start > /dev/null
done
sleep 5;
done
}
# Kills all job manager.
function jm_kill_all {
kill_all 'StandaloneSessionClusterEntrypoint'
}
# Kills all task manager.
function tm_kill_all {
kill_all 'TaskManagerRunner|TaskManager'
}
# Kills all processes that match the given name.
function kill_all {
local pid=`jps | grep -E "${1}" | cut -d " " -f 1`
kill ${pid} 2> /dev/null
wait ${pid} 2> /dev/null
}
function kill_random_taskmanager {
KILL_TM=$(jps | grep "TaskManager" | sort -R | head -n 1 | awk '{print $1}')
kill -9 "$KILL_TM"
echo "TaskManager $KILL_TM killed."
}
function setup_flink_slf4j_metric_reporter() {
INTERVAL="${1:-1 SECONDS}"
cp $FLINK_DIR/opt/flink-metrics-slf4j-*.jar $FLINK_DIR/lib/
set_conf "metrics.reporter.slf4j.class" "org.apache.flink.metrics.slf4j.Slf4jReporter"
set_conf "metrics.reporter.slf4j.interval" "${INTERVAL}"
}
function rollback_flink_slf4j_metric_reporter() {
rm $FLINK_DIR/lib/flink-metrics-slf4j-*.jar
}
function get_metric_processed_records {
OPERATOR=$1
N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1)
if [ -z $N ]; then
N=0
fi
echo $N
}
function get_num_metric_samples {
OPERATOR=$1
N=$(grep ".General purpose test job.$OPERATOR.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l)
if [ -z $N ]; then
N=0
fi
echo $N
}
function wait_oper_metric_num_in_records {
OPERATOR=$1
MAX_NUM_METRICS="${2:-200}"
NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
OLD_NUM_METRICS=${3:-${NUM_METRICS}}
# monitor the numRecordsIn metric of the state machine operator in the second execution
# we let the test finish once the second restore execution has processed 200 records
while : ; do
NUM_METRICS=$(get_num_metric_samples ${OPERATOR})
NUM_RECORDS=$(get_metric_processed_records ${OPERATOR})
# only account for metrics that appeared in the second execution
if (( $OLD_NUM_METRICS >= $NUM_METRICS )) ; then
NUM_RECORDS=0
fi
if (( $NUM_RECORDS < $MAX_NUM_METRICS )); then
echo "Waiting for job to process up to 200 records, current progress: $NUM_RECORDS records ..."
sleep 1
else
break
fi
done
}
function wait_num_checkpoints {
JOB=$1
NUM_CHECKPOINTS=$2
echo "Waiting for job ($JOB) to have at least $NUM_CHECKPOINTS completed checkpoints ..."
while : ; do
N=$(grep -o "Completed checkpoint [1-9]* for job $JOB" $FLINK_DIR/log/*standalonesession*.log | awk '{print $3}' | tail -1)
if [ -z $N ]; then
N=0
fi
if (( N < NUM_CHECKPOINTS )); then
sleep 1
else
break
fi
done
}
# Starts the timer. Note that nested timers are not supported.
function start_timer {
SECONDS=0
}
# prints the number of minutes and seconds that have elapsed since the last call to start_timer
function end_timer {
duration=$SECONDS
echo "$(($duration / 60)) minutes and $(($duration % 60)) seconds"
}