blob: a83e2a21384e78db4168b980c9f27d56050dce4c [file] [log] [blame]
#!/usr/bin/env bash
###############################################################################
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################
COMMAND_STANDALONE="standalone-job"
# Deprecated, should be remove in Flink release 1.13
COMMAND_NATIVE_KUBERNETES="native-k8s"
COMMAND_HISTORY_SERVER="history-server"
COMMAND_DISABLE_JEMALLOC="disable-jemalloc"
# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"
drop_privs_cmd() {
if [ $(id -u) != 0 ]; then
# Don't need to drop privs if EUID != 0
return
elif [ -x /sbin/su-exec ]; then
# Alpine
echo su-exec flink
else
# Others
echo gosu flink
fi
}
copy_plugins_if_required() {
if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
return 0
fi
echo "Enabling required built-in plugins"
for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
echo "Linking ${target_plugin} to plugin directory"
plugin_name=${target_plugin%.jar}
mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
echo "Plugin ${target_plugin} does not exist. Exiting."
exit 1
else
ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
echo "Successfully enabled ${target_plugin}"
fi
done
}
set_config_option() {
local option=$1
local value=$2
# escape periods for usage in regular expressions
local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")
# either override an existing entry, or append a new one
if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
else
echo "${option}: ${value}" >> "${CONF_FILE}"
fi
}
set_common_options() {
set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
set_config_option blob.server.port 6124
set_config_option query.server.port 6125
}
prepare_job_manager_start() {
echo "Starting Job Manager"
copy_plugins_if_required
set_common_options
if [ -n "${FLINK_PROPERTIES}" ]; then
echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}
disable_jemalloc_env() {
# use nameref '_args' to update the passed 'args' within function
local -n _args=$1
if [ "${_args[0]}" = ${COMMAND_DISABLE_JEMALLOC} ]; then
echo "Disable Jemalloc as the memory allocator"
_args=("${_args[@]:1}")
else
echo "Enable Jemalloc as the memory allocator via appending env variable LD_PRELOAD with /usr/lib/x86_64-linux-gnu/libjemalloc.so"
export LD_PRELOAD=$LD_PRELOAD:/usr/lib/x86_64-linux-gnu/libjemalloc.so
fi
}
args=("$@")
if [ "$1" = "help" ]; then
printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER}) [${COMMAND_DISABLE_JEMALLOC}]\n"
printf " Or $(basename "$0") help\n\n"
printf "By default, Flink image adopts jemalloc as default memory allocator and will disable jemalloc if option '${COMMAND_DISABLE_JEMALLOC}' given.\n"
exit 0
elif [ "$1" = "jobmanager" ]; then
args=("${args[@]:1}")
disable_jemalloc_env args
prepare_job_manager_start
exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
args=("${args[@]:1}")
disable_jemalloc_env args
prepare_job_manager_start
exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
args=("${args[@]:1}")
disable_jemalloc_env args
echo "Starting History Server"
copy_plugins_if_required
if [ -n "${FLINK_PROPERTIES}" ]; then
echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then
args=("${args[@]:1}")
disable_jemalloc_env args
echo "Starting Task Manager"
copy_plugins_if_required
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
set_common_options
set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
if [ -n "${FLINK_PROPERTIES}" ]; then
echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
fi
envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = "$COMMAND_NATIVE_KUBERNETES" ]; then
args=("${args[@]:1}")
disable_jemalloc_env args
copy_plugins_if_required
export _FLINK_HOME_DETERMINED=true
. $FLINK_HOME/bin/config.sh
export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
# Start commands for jobmanager and taskmanager are generated by Flink internally.
echo "Start command: ${args[@]}"
exec $(drop_privs_cmd) bash -c "${args[@]}"
fi
args=("${args[@]}")
disable_jemalloc_env args
copy_plugins_if_required
# Set the Flink related environments
export _FLINK_HOME_DETERMINED=true
. $FLINK_HOME/bin/config.sh
export FLINK_CLASSPATH="`constructFlinkClassPath`:$INTERNAL_HADOOP_CLASSPATHS"
echo "Running command in pass-through mode: ${args[@]}"
exec $(drop_privs_cmd) "${args[@]}"