blob: 3359730ed2109a8bc4311ae1e3c4e4c5207665f2 [file] [log] [blame]
#!/usr/bin/env bash
################################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# shellcheck source=common.sh
source "$(dirname "$0")"/common.sh
# shellcheck source=common_yarn_docker.sh
source "$(dirname "$0")"/common_yarn_docker.sh
# Configure Flink dir before making tarball.
INPUT_TYPE=${1:-default-input}
case $INPUT_TYPE in
(default-input)
INPUT_ARGS=""
;;
(dummy-fs)
# shellcheck source=common_dummy_fs.sh
source "$(dirname "$0")"/common_dummy_fs.sh
dummy_fs_setup
INPUT_ARGS="--input dummy://localhost/words --input anotherDummy://localhost/words"
;;
(*)
echo "Unknown input type ${INPUT_TYPE}"
exit 1
;;
esac
start_hadoop_cluster_and_prepare_flink
# make the output path random, just in case it already exists, for example if we
# had cached docker containers
OUTPUT_PATH=hdfs:///user/hadoop-user/wc-out-$RANDOM
# it's important to run this with higher parallelism, otherwise we might risk that
# JM and TM are on the same YARN node and that we therefore don't test the keytab shipping
if docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
/home/hadoop-user/${FLINK_DIRNAME}/bin/flink run \
-t yarn-application \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dtaskmanager.memory.process.size=1000m \
-Djobmanager.memory.process.size=1000m \
-Dparallelism.default=3 \
-Dtaskmanager.memory.jvm-metaspace.size=128m \
/home/hadoop-user/${FLINK_DIRNAME}/examples/streaming/WordCount.jar ${INPUT_ARGS} --output ${OUTPUT_PATH}";
then
echo "Flink YARN Application submitted."
else
echo "Running the job failed."
exit 1
fi
wait_for_single_yarn_application
# now we should have the application output ready
OUTPUT=$(get_output "${OUTPUT_PATH}/*/*")
echo "==== OUTPUT_BEGIN ===="
echo "${OUTPUT}"
echo "==== OUTPUT_END ===="
YARN_APPLICATION_LOGS=$(get_yarn_application_logs)
if [[ ! "${YARN_APPLICATION_LOGS}" =~ "Receive initial delegation tokens from resource manager" ]]; then
echo "YARN logs does not contain delegation token usage message as required"
exit 1
fi
echo "Running Job without configured keytab, the exception you see below is expected"
docker exec master bash -c "echo \"\" > /home/hadoop-user/${FLINK_DIRNAME}/conf/config.yaml"
# verify that it doesn't work if we don't configure a keytab
docker exec master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && \
/home/hadoop-user/${FLINK_DIRNAME}/bin/flink run \
-t yarn-application \
-Dtaskmanager.numberOfTaskSlots=1 \
-Dtaskmanager.memory.process.size=1000m \
-Djobmanager.memory.process.size=1000m \
-Dparallelism.default=3 \
-Dtaskmanager.memory.jvm-metaspace.size=128m \
/home/hadoop-user/${FLINK_DIRNAME}/examples/streaming/WordCount.jar --output ${OUTPUT_PATH}" > stderrAndstdoutFile 2>&1
STD_ERR_AND_STD_OUT=$(cat stderrAndstdoutFile)
rm stderrAndstdoutFile
echo "==== STD_ERR_AND_STD_OUT_BEGIN ===="
echo "${STD_ERR_AND_STD_OUT}"
echo "==== STD_ERR_AND_STD_OUT_END ===="
if [[ ! "${STD_ERR_AND_STD_OUT}" =~ "Hadoop security with Kerberos is enabled but the login user does not have Kerberos credentials" ]]; then
echo "Output does not contain the Kerberos error message as required"
exit 1
fi