| #!/usr/bin/env bash |
| |
| ############################################################################### |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| ############################################################################### |
| |
| COMMAND_STANDALONE="standalone-job" |
| COMMAND_HISTORY_SERVER="history-server" |
| |
| # If unspecified, the hostname of the container is taken as the JobManager address |
| JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)} |
| CONF_FILE_DIR="${FLINK_HOME}/conf" |
| |
| drop_privs_cmd() { |
| if [ $(id -u) != 0 ]; then |
| # Don't need to drop privs if EUID != 0 |
| return |
| elif [ -x /sbin/su-exec ]; then |
| # Alpine |
| echo su-exec flink |
| else |
| # Others |
| echo gosu flink |
| fi |
| } |
| |
| copy_plugins_if_required() { |
| if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then |
| return 0 |
| fi |
| |
| echo "Enabling required built-in plugins" |
| for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do |
| echo "Linking ${target_plugin} to plugin directory" |
| plugin_name=${target_plugin%.jar} |
| |
| mkdir -p "${FLINK_HOME}/plugins/${plugin_name}" |
| if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then |
| echo "Plugin ${target_plugin} does not exist. Exiting." |
| exit 1 |
| else |
| ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}" |
| echo "Successfully enabled ${target_plugin}" |
| fi |
| done |
| } |
| |
| set_config_options() { |
| local config_parser_script="$FLINK_HOME/bin/config-parser-utils.sh" |
| local config_dir="$FLINK_HOME/conf" |
| local bin_dir="$FLINK_HOME/bin" |
| local lib_dir="$FLINK_HOME/lib" |
| |
| local config_params=() |
| |
| while [ $# -gt 0 ]; do |
| local key="$1" |
| local value="$2" |
| |
| config_params+=("-D${key}=${value}") |
| |
| shift 2 |
| done |
| |
| if [ "${#config_params[@]}" -gt 0 ]; then |
| "${config_parser_script}" "${config_dir}" "${bin_dir}" "${lib_dir}" "${config_params[@]}" |
| fi |
| } |
| |
| prepare_configuration() { |
| local config_options=() |
| |
| config_options+=("jobmanager.rpc.address" "${JOB_MANAGER_RPC_ADDRESS}") |
| config_options+=("blob.server.port" "6124") |
| config_options+=("query.server.port" "6125") |
| |
| if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then |
| config_options+=("taskmanager.numberOfTaskSlots" "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}") |
| fi |
| |
| if [ ${#config_options[@]} -ne 0 ]; then |
| set_config_options "${config_options[@]}" |
| fi |
| |
| if [ -n "${FLINK_PROPERTIES}" ]; then |
| process_flink_properties "${FLINK_PROPERTIES}" |
| fi |
| } |
| |
| process_flink_properties() { |
| local flink_properties_content=$1 |
| local config_options=() |
| |
| local OLD_IFS="$IFS" |
| IFS=$'\n' |
| for prop in $flink_properties_content; do |
| prop=$(echo $prop | tr -d '[:space:]') |
| |
| if [ -z "$prop" ]; then |
| continue |
| fi |
| |
| IFS=':' read -r key value <<< "$prop" |
| |
| value=$(echo $value | envsubst) |
| |
| config_options+=("$key" "$value") |
| done |
| IFS="$OLD_IFS" |
| |
| if [ ${#config_options[@]} -ne 0 ]; then |
| set_config_options "${config_options[@]}" |
| fi |
| } |
| |
| maybe_enable_jemalloc() { |
| if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then |
| JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so" |
| JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so" |
| if [ -f "$JEMALLOC_PATH" ]; then |
| export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH |
| elif [ -f "$JEMALLOC_FALLBACK" ]; then |
| export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK |
| else |
| if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then |
| MSG_PATH=$JEMALLOC_PATH |
| else |
| MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK" |
| fi |
| echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead." |
| fi |
| fi |
| } |
| |
| maybe_enable_jemalloc |
| |
| copy_plugins_if_required |
| |
| prepare_configuration |
| |
| args=("$@") |
| if [ "$1" = "help" ]; then |
| printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n" |
| printf " Or $(basename "$0") help\n\n" |
| printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n" |
| exit 0 |
| elif [ "$1" = "jobmanager" ]; then |
| args=("${args[@]:1}") |
| |
| echo "Starting Job Manager" |
| |
| exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}" |
| elif [ "$1" = ${COMMAND_STANDALONE} ]; then |
| args=("${args[@]:1}") |
| |
| echo "Starting Job Manager" |
| |
| exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}" |
| elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then |
| args=("${args[@]:1}") |
| |
| echo "Starting History Server" |
| |
| exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}" |
| elif [ "$1" = "taskmanager" ]; then |
| args=("${args[@]:1}") |
| |
| echo "Starting Task Manager" |
| |
| exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}" |
| fi |
| |
| args=("${args[@]}") |
| |
| # Running command in pass-through mode |
| exec $(drop_privs_cmd) "${args[@]}" |