blob: 5e73591b36d2d6ee24432e5838046f4165b328eb [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
source "$(dirname "$0")"/common.sh
# This function checks the logs for entries that indicate problems with local recovery
function check_logs {
local parallelism=$1
local attempts=$2
(( expected_count=parallelism * (attempts + 1) ))
# Search for the log message that indicates restore problem from existing local state for the keyed backend.
local failed_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
# Search for attempts to recover locally.
local attempt_local_recovery=$(grep '^.*Creating keyed state backend.* from alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
if [ ${failed_local_recovery} -ne 0 ]
then
echo "FAILURE: Found ${failed_local_recovery} failed attempt(s) for local recovery of correctly scheduled task(s)."
exit 1
fi
if [ ${attempt_local_recovery} -eq 0 ]
then
echo "FAILURE: Found no attempt for local recovery. Configuration problem?"
exit 1
fi
}
# This function does a cleanup after the test. The configuration is restored, the watchdog is terminated and temporary
# files and folders are deleted.
function cleanup_after_test {
# Reset the configurations
sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' "$FLINK_DIR/conf/log4j.properties"
#
kill ${watchdog_pid} 2> /dev/null
wait ${watchdog_pid} 2> /dev/null
}
# Calls the cleanup step for this tests and exits with an error.
function cleanup_after_test_and_exit_fail {
cleanup_after_test
exit 1
}
## This function executes one run for a certain configuration
function run_local_recovery_test {
local parallelism=$1
local max_attempts=$2
local backend=$3
local incremental=$4
local kill_jvm=$5
echo "Running local recovery test with configuration:
parallelism: ${parallelism}
max attempts: ${max_attempts}
backend: ${backend}
incremental checkpoints: ${incremental}
kill JVM: ${kill_jvm}"
TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-local-recovery-and-allocation-test/target/StickyAllocationAndLocalRecoveryTestJob.jar
# Backup conf and configure for HA
backup_config
create_ha_config
# Enable debug logging
sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' "$FLINK_DIR/conf/log4j.properties"
# Enable local recovery
set_conf "state.backend.local-recovery" "true"
# Ensure that each TM only has one operator(chain)
set_conf "taskmanager.numberOfTaskSlots" "1"
rm $FLINK_DIR/log/* 2> /dev/null
# Start HA server
start_local_zk
start_cluster
tm_watchdog ${parallelism} &
watchdog_pid=$!
echo "Started TM watchdog with PID ${watchdog_pid}."
$FLINK_DIR/bin/flink run -c org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \
-p ${parallelism} $TEST_PROGRAM_JAR \
-D state.backend.local-recovery=ENABLE_FILE_BASED \
--checkpointDir file://$TEST_DATA_DIR/local_recovery_test/checkpoints \
--output $TEST_DATA_DIR/out/local_recovery_test/out --killJvmOnFail ${kill_jvm} --checkpointInterval 1000 \
--maxAttempts ${max_attempts} --parallelism ${parallelism} --stateBackend ${backend} \
--incrementalCheckpoints ${incremental}
check_logs ${parallelism} ${max_attempts}
cleanup_after_test
}
## MAIN
trap cleanup_after_test_and_exit_fail EXIT
run_local_recovery_test "$@"
trap - EXIT
exit 0