blob: b8ca27b3b14dfd9149a70da832cfb6f7056e6ee1 [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
OUT_TYPE="${1:-local}" # other type: s3
source "$(dirname "$0")"/common.sh
source "$(dirname "$0")"/common_s3.sh
s3_setup hadoop
set_conf_ssl "mutual"
set_conf "metrics.fetcher.update-interval" "2000"
OUT=temp/test_streaming_file_sink-$(uuidgen)
OUTPUT_PATH="$TEST_DATA_DIR/$OUT"
S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$OUT"
mkdir -p $OUTPUT_PATH
if [ "${OUT_TYPE}" == "local" ]; then
echo "Use local output"
JOB_OUTPUT_PATH=${OUTPUT_PATH}
elif [ "${OUT_TYPE}" == "s3" ]; then
echo "Use s3 output"
JOB_OUTPUT_PATH=${S3_OUTPUT_PATH}
else
echo "Unknown output type: ${OUT_TYPE}"
exit 1
fi
# make sure we delete the file at the end
function out_cleanup {
s3_delete_by_full_path_prefix $OUT
}
if [ "${OUT_TYPE}" == "s3" ]; then
trap out_cleanup EXIT
fi
TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
###################################
# Get all lines in part files and sort them numerically.
#
# Globals:
# OUTPUT_PATH
# Arguments:
# None
# Returns:
# sorted content of part files
###################################
function get_complete_result {
if [ "${OUT_TYPE}" == "s3" ]; then
rm -rf $OUTPUT_PATH; mkdir -p $OUTPUT_PATH
s3_get_by_full_path_and_filename_prefix ${TEST_DATA_DIR} "${OUT}" "part-"
fi
find "${OUTPUT_PATH}" -type f \( -iname "part-*" \) -exec cat {} + | sort -g
}
###################################
# Get total number of lines in part files.
#
# Globals:
# OUT
# Arguments:
# None
# Returns:
# line number in part files
###################################
function get_total_number_of_valid_lines {
if [ "${OUT_TYPE}" == "local" ]; then
get_complete_result | wc -l | tr -d '[:space:]'
elif [ "${OUT_TYPE}" == "s3" ]; then
s3_get_number_of_lines_by_prefix "${OUT}" "part-"
fi
}
###################################
# Waits until a number of values have been written within a timeout.
# If the timeout expires, exit with return code 1.
#
# Globals:
# None
# Arguments:
# $1: the number of expected values
# $2: timeout in seconds
# Returns:
# None
###################################
function wait_for_complete_result {
local expected_number_of_values=$1
local polling_timeout=$2
local polling_interval=1
local seconds_elapsed=0
local number_of_values=0
local previous_number_of_values=-1
while [[ ${number_of_values} -lt ${expected_number_of_values} ]]; do
if [[ ${seconds_elapsed} -ge ${polling_timeout} ]]; then
echo "Did not produce expected number of values within ${polling_timeout}s"
exit 1
fi
sleep ${polling_interval}
((seconds_elapsed += ${polling_interval}))
number_of_values=$(get_total_number_of_valid_lines)
if [[ ${previous_number_of_values} -ne ${number_of_values} ]]; then
echo "Number of produced values ${number_of_values}/${expected_number_of_values}"
previous_number_of_values=${number_of_values}
fi
done
}
start_cluster
"${FLINK_DIR}/bin/taskmanager.sh" start
"${FLINK_DIR}/bin/taskmanager.sh" start
"${FLINK_DIR}/bin/taskmanager.sh" start
echo "Submitting job."
CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}" --outputPath "${JOB_OUTPUT_PATH}")
JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID" | sed 's/.* //g')
if [[ -z $JOB_ID ]]; then
echo "Job could not be submitted."
echo "${CLIENT_OUTPUT}"
exit 1
fi
wait_job_running ${JOB_ID}
wait_num_checkpoints "${JOB_ID}" 3
echo "Killing TM"
kill_random_taskmanager
echo "Starting TM"
"$FLINK_DIR/bin/taskmanager.sh" start
wait_for_restart_to_complete 0 ${JOB_ID}
echo "Killing 2 TMs"
kill_random_taskmanager
kill_random_taskmanager
echo "Starting 2 TMs"
"$FLINK_DIR/bin/taskmanager.sh" start
"$FLINK_DIR/bin/taskmanager.sh" start
wait_for_restart_to_complete 1 ${JOB_ID}
echo "Waiting until all values have been produced"
wait_for_complete_result 60000 300
cancel_job "${JOB_ID}"
wait_job_terminal_state "${JOB_ID}" "CANCELED"
get_complete_result > "${TEST_DATA_DIR}/complete_result"
check_result_hash "File Streaming Sink" "$TEST_DATA_DIR/complete_result" "6727342fdd3aae2129e61fc8f433fb6f"