[hotfix] Update template Flink dist bin scripts for Flink 1.11.1
diff --git a/template/flink-distribution/bin/config.sh b/template/flink-distribution/bin/config.sh
new file mode 100755
index 0000000..268f3fc
--- /dev/null
+++ b/template/flink-distribution/bin/config.sh
@@ -0,0 +1,574 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Backported due to FLINK-18639
+# This file can be removed when upgrading
+# to Flink 1.11.2 or 1.12
+
+constructFlinkClassPath() {
+ local FLINK_DIST
+ local FLINK_CLASSPATH
+
+ while read -d '' -r jarfile ; do
+ if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then
+ FLINK_DIST="$FLINK_DIST":"$jarfile"
+ elif [[ "$FLINK_CLASSPATH" == "" ]]; then
+ FLINK_CLASSPATH="$jarfile";
+ else
+ FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
+ fi
+ done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
+
+ if [[ "$FLINK_DIST" == "" ]]; then
+ # write error message to stderr since stdout is stored as the classpath
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+ # exit function with empty classpath to force process failure
+ exit 1
+ fi
+
+ echo "$FLINK_CLASSPATH""$FLINK_DIST"
+}
+
+findFlinkDistJar() {
+ local FLINK_DIST="`find "$FLINK_LIB_DIR" -name 'flink-dist*.jar'`"
+
+ if [[ "$FLINK_DIST" == "" ]]; then
+ # write error message to stderr since stdout is stored as the classpath
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+ # exit function with empty classpath to force process failure
+ exit 1
+ fi
+
+ echo "$FLINK_DIST"
+}
+
+# These are used to mangle paths that are passed to java when using
+# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
+# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
+# "cygpath" can do the conversion.
+manglePath() {
+ UNAME=$(uname -s)
+ if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ echo `cygpath -w "$1"`
+ else
+ echo $1
+ fi
+}
+
+manglePathList() {
+ UNAME=$(uname -s)
+ # a path list, for example a java classpath
+ if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ echo `cygpath -wp "$1"`
+ else
+ echo $1
+ fi
+}
+
+# Looks up a config value by key from a simple YAML-style key-value map.
+# $1: key to look up
+# $2: default value to return if key does not exist
+# $3: config file to read from
+readFromConfig() {
+ local key=$1
+ local defaultValue=$2
+ local configFile=$3
+
+ # first extract the value with the given key (1st sed), then trim the result (2nd sed)
+ # if a key exists multiple times, take the "last" one (tail)
+ local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`
+
+ [ -z "$value" ] && echo "$defaultValue" || echo "$value"
+}
+
+########################################################################################################################
+# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml
+# -or- the respective environment variables are not set.
+########################################################################################################################
+
+
+# WARNING !!! , these values are only used if there is nothing else is specified in
+# conf/flink-conf.yaml
+
+DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to
+DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep
+DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
+DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
+DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
+DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)
+DEFAULT_ENV_JAVA_OPTS_CLI="" # Optional JVM args (Client)
+DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
+DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
+DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary
+DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary
+
+########################################################################################################################
+# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
+########################################################################################################################
+
+KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
+
+KEY_ENV_PID_DIR="env.pid.dir"
+KEY_ENV_LOG_DIR="env.log.dir"
+KEY_ENV_LOG_MAX="env.log.max"
+KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
+KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"
+KEY_ENV_JAVA_HOME="env.java.home"
+KEY_ENV_JAVA_OPTS="env.java.opts"
+KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
+KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
+KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"
+KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"
+KEY_ENV_SSH_OPTS="env.ssh.opts"
+KEY_HIGH_AVAILABILITY="high-availability"
+KEY_ZK_HEAP_MB="zookeeper.heap.mb"
+
+########################################################################################################################
+# PATHS AND CONFIG
+########################################################################################################################
+
+target="$0"
+# For the case, the executable has been directly symlinked, figure out
+# the correct bin path by following its symlink up to an upper bound.
+# Note: we can't use the readlink utility here if we want to be POSIX
+# compatible.
+iteration=0
+while [ -L "$target" ]; do
+ if [ "$iteration" -gt 100 ]; then
+ echo "Cannot resolve path: You have a cyclic symlink in $target."
+ break
+ fi
+ ls=`ls -ld -- "$target"`
+ target=`expr "$ls" : '.* -> \(.*\)$'`
+ iteration=$((iteration + 1))
+done
+
+# Convert relative path to absolute path and resolve directory symlinks
+bin=`dirname "$target"`
+SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
+
+# Define the main directory of the flink installation
+# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here.
+if [ -z "$_FLINK_HOME_DETERMINED" ]; then
+ FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
+fi
+FLINK_LIB_DIR=$FLINK_HOME/lib
+FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
+FLINK_OPT_DIR=$FLINK_HOME/opt
+
+
+# These need to be mangled because they are directly passed to java.
+# The above lib path is used by the shell script to retrieve jars in a
+# directory, so it needs to be unmangled.
+FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`
+if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi
+FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin
+DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log
+FLINK_CONF_FILE="flink-conf.yaml"
+YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
+
+### Exported environment variables ###
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_PLUGINS_DIR
+# export /lib dir to access it during deployment of the Yarn staging files
+export FLINK_LIB_DIR
+# export /opt dir to access it for the SQL client
+export FLINK_OPT_DIR
+
+########################################################################################################################
+# ENVIRONMENT VARIABLES
+########################################################################################################################
+
+# read JAVA_HOME from config with no default value
+MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")
+# check if config specified JAVA_HOME
+if [ -z "${MY_JAVA_HOME}" ]; then
+ # config did not specify JAVA_HOME. Use system JAVA_HOME
+ MY_JAVA_HOME=${JAVA_HOME}
+fi
+# check if we have a valid JAVA_HOME and if java is not available
+if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then
+ echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME."
+ exit 1
+else
+ JAVA_HOME=${MY_JAVA_HOME}
+fi
+
+UNAME=$(uname -s)
+if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ JAVA_RUN=java
+else
+ if [[ -d $JAVA_HOME ]]; then
+ JAVA_RUN=$JAVA_HOME/bin/java
+ else
+ JAVA_RUN=java
+ fi
+fi
+
+# Define HOSTNAME if it is not already set
+if [ -z "${HOSTNAME}" ]; then
+ HOSTNAME=`hostname`
+fi
+
+IS_NUMBER="^[0-9]+$"
+
+# Verify that NUMA tooling is available
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+ FLINK_TM_COMPUTE_NUMA="false"
+else
+ # Define FLINK_TM_COMPUTE_NUMA if it is not already set
+ if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then
+ FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}")
+ fi
+fi
+
+if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
+ MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_LOG_DIR}" ]; then
+ FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${YARN_CONF_DIR}" ]; then
+ YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${HADOOP_CONF_DIR}" ]; then
+ HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${HBASE_CONF_DIR}" ]; then
+ HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_PID_DIR}" ]; then
+ FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
+ FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
+
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
+ FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then
+ FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; then
+ FLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_SSH_OPTS}" ]; then
+ FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
+fi
+
+# Define ZK_HEAP if it is not already set
+if [ -z "${ZK_HEAP}" ]; then
+ ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
+fi
+
+# High availability
+if [ -z "${HIGH_AVAILABILITY}" ]; then
+ HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
+ if [ -z "${HIGH_AVAILABILITY}" ]; then
+ # Try deprecated value
+ DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
+ if [ -z "${DEPRECATED_HA}" ]; then
+ HIGH_AVAILABILITY="none"
+ elif [ ${DEPRECATED_HA} == "standalone" ]; then
+ # Standalone is now 'none'
+ HIGH_AVAILABILITY="none"
+ else
+ HIGH_AVAILABILITY=${DEPRECATED_HA}
+ fi
+ fi
+fi
+
+# Arguments for the JVM. Used for job and task manager JVMs.
+# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
+# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!
+if [ -z "${JVM_ARGS}" ]; then
+ JVM_ARGS=""
+fi
+
+# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
+if [ -z "$HADOOP_CONF_DIR" ]; then
+ if [ -n "$HADOOP_HOME" ]; then
+ # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
+ if [ -d "$HADOOP_HOME/conf" ]; then
+ # It's Hadoop 1.x
+ HADOOP_CONF_DIR="$HADOOP_HOME/conf"
+ fi
+ if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
+ # It's Hadoop 2.2+
+ HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
+ fi
+ fi
+fi
+
+# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available)
+if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then
+ if [ -d "/etc/hadoop/conf" ]; then
+ echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set."
+ HADOOP_CONF_DIR="/etc/hadoop/conf"
+ fi
+fi
+
+# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty.
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -n "$HBASE_HOME" ]; then
+ # HBASE_HOME is set.
+ if [ -d "$HBASE_HOME/conf" ]; then
+ HBASE_CONF_DIR="$HBASE_HOME/conf"
+ fi
+ fi
+fi
+
+# try and set HBASE_CONF_DIR to some common default if it's not set
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -d "/etc/hbase/conf" ]; then
+ echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."
+ HBASE_CONF_DIR="/etc/hbase/conf"
+ fi
+fi
+
+INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
+
+if [ -n "${HBASE_CONF_DIR}" ]; then
+ INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
+fi
+
+# Auxilliary function which extracts the name of host from a line which
+# also potentially includes topology information and the taskManager type
+extractHostName() {
+ # handle comments: extract first part of string (before first # character)
+ WORKER=`echo $1 | cut -d'#' -f 1`
+
+ # Extract the hostname from the network hierarchy
+ if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then
+ WORKER=${BASH_REMATCH[1]}
+ fi
+
+ echo $WORKER
+}
+
+# Auxilliary functions for log file rotation
+rotateLogFilesWithPrefix() {
+ dir=$1
+ prefix=$2
+ while read -r log ; do
+ rotateLogFile "$log"
+ # find distinct set of log file names, ignoring the rotation number (trailing dot and digit)
+ done < <(find "$dir" ! -type d -path "${prefix}*" | sed s/\.[0-9][0-9]*$// | sort | uniq)
+}
+
+rotateLogFile() {
+ log=$1;
+ num=$MAX_LOG_FILE_NUMBER
+ if [ -f "$log" -a "$num" -gt 0 ]; then
+ while [ $num -gt 1 ]; do
+ prev=`expr $num - 1`
+ [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
+ num=$prev
+ done
+ mv "$log" "$log.$num";
+ fi
+}
+
+readMasters() {
+ MASTERS_FILE="${FLINK_CONF_DIR}/masters"
+
+ if [[ ! -f "${MASTERS_FILE}" ]]; then
+ echo "No masters file. Please specify masters in 'conf/masters'."
+ exit 1
+ fi
+
+ MASTERS=()
+ WEBUIPORTS=()
+
+ MASTERS_ALL_LOCALHOST=true
+ GOON=true
+ while $GOON; do
+ read line || GOON=false
+ HOSTWEBUIPORT=$( extractHostName $line)
+
+ if [ -n "$HOSTWEBUIPORT" ]; then
+ HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)
+ WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)
+ MASTERS+=(${HOST})
+
+ if [ -z "$WEBUIPORT" ]; then
+ WEBUIPORTS+=(0)
+ else
+ WEBUIPORTS+=(${WEBUIPORT})
+ fi
+
+ if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
+ MASTERS_ALL_LOCALHOST=false
+ fi
+ fi
+ done < "$MASTERS_FILE"
+}
+
+readWorkers() {
+ WORKERS_FILE="${FLINK_CONF_DIR}/workers"
+
+ if [[ ! -f "$WORKERS_FILE" ]]; then
+ echo "No workers file. Please specify workers in 'conf/workers'."
+ exit 1
+ fi
+
+ WORKERS=()
+
+ WORKERS_ALL_LOCALHOST=true
+ GOON=true
+ while $GOON; do
+ read line || GOON=false
+ HOST=$( extractHostName $line)
+ if [ -n "$HOST" ] ; then
+ WORKERS+=(${HOST})
+ if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
+ WORKERS_ALL_LOCALHOST=false
+ fi
+ fi
+ done < "$WORKERS_FILE"
+}
+
+# starts or stops TMs on all workers
+# TMWorkers start|stop
+TMWorkers() {
+ CMD=$1
+
+ readWorkers
+
+ if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then
+ # all-local setup
+ for worker in ${WORKERS[@]}; do
+ "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
+ done
+ else
+ # non-local setup
+ # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
+ command -v pdsh >/dev/null 2>&1
+ if [[ $? -ne 0 ]]; then
+ for worker in ${WORKERS[@]}; do
+ ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
+ done
+ else
+ PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \
+ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
+ fi
+ fi
+}
+
+runBashJavaUtilsCmd() {
+ local cmd=$1
+ local conf_dir=$2
+ local class_path=$3
+ local dynamic_args=${@:4}
+ class_path=`manglePathList "${class_path}"`
+
+ local output=`${JAVA_RUN} -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
+ # Print the output in case the user redirect the log to console.
+ echo "$output" 1>&2
+ exit 1
+ fi
+
+ echo "$output"
+}
+
+extractExecutionResults() {
+ local output="$1"
+ local expected_lines="$2"
+ local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+ local execution_results
+ local num_lines
+
+ execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})
+ num_lines=$(echo "${execution_results}" | wc -l)
+ # explicit check for empty result, becuase if execution_results is empty, then wc returns 1
+ if [[ -z ${execution_results} ]]; then
+ echo "[ERROR] The execution result is empty." 1>&2
+ exit 1
+ fi
+ if [[ ${num_lines} -ne ${expected_lines} ]]; then
+ echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2
+ echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2
+ echo "$output" 1>&2
+ exit 1
+ fi
+
+ echo "${execution_results//${EXECUTION_PREFIX}/}"
+}
+
+extractLoggingOutputs() {
+ local output="$1"
+ local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+
+ echo "${output}" | grep -v ${EXECUTION_PREFIX}
+}
+
+parseJmJvmArgsAndExportLogs() {
+ java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "$@")
+ logging_output=$(extractLoggingOutputs "${java_utils_output}")
+ jvm_params=$(extractExecutionResults "${java_utils_output}" 1)
+
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
+ echo "[ERROR] Raw output from BashJavaUtils:"
+ echo "$java_utils_output"
+ exit 1
+ fi
+
+ export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
+ export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+JM_RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+logs: $logging_output
+"
+}
diff --git a/template/flink-distribution/bin/flink-console.sh b/template/flink-distribution/bin/flink-console.sh
index 16f97ad..8fc0b05 100755
--- a/template/flink-distribution/bin/flink-console.sh
+++ b/template/flink-distribution/bin/flink-console.sh
@@ -30,7 +30,7 @@
. "$bin"/config.sh
-case ${SERVICE} in
+case $SERVICE in
(taskexecutor)
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
;;
@@ -48,7 +48,7 @@
;;
(standalonejob)
- CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
+ CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
;;
(statefun)
@@ -63,7 +63,35 @@
FLINK_TM_CLASSPATH=`constructFlinkClassPath`
-log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$SERVICE.pid
+mkdir -p "$FLINK_PID_DIR"
+# The lock needs to be released after use because this script is started foreground
+command -v flock >/dev/null 2>&1
+flock_exist=$?
+if [[ ${flock_exist} -eq 0 ]]; then
+ exec 200<"$FLINK_PID_DIR"
+ flock 200
+fi
+# Remove the pid file when all the processes are dead
+if [ -f "$pid" ]; then
+ all_dead=0
+ while read each_pid; do
+ # Check whether the process is still running
+ kill -0 $each_pid > /dev/null 2>&1
+ [[ $? -eq 0 ]] && all_dead=1
+ done < "$pid"
+ [ ${all_dead} -eq 0 ] && rm $pid
+fi
+id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
+
+FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-${id}-${HOSTNAME}"
+log="${FLINK_LOG_PREFIX}.log"
+
+log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
@@ -75,4 +103,11 @@
fi
echo "Starting $SERVICE as a console application on host $HOSTNAME."
-exec $JAVA_RUN ${JVM_ARGS} ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
+
+# Add the current process id to pid file
+echo $$ >> "$pid" 2>/dev/null
+
+# Release the lock because the java process runs in the foreground and would block other processes from modifying the pid file
+[[ ${flock_exist} -eq 0 ]] && flock -u 200
+
+exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
diff --git a/template/flink-distribution/bin/standalone-job.sh b/template/flink-distribution/bin/standalone-job.sh
index e9def43..f7709e9 100755
--- a/template/flink-distribution/bin/standalone-job.sh
+++ b/template/flink-distribution/bin/standalone-job.sh
@@ -24,8 +24,8 @@
STARTSTOP=$1
ENTRY_POINT_NAME="statefun"
-if [[ ${STARTSTOP} != "start" ]] && [[ ${STARTSTOP} != "start-foreground" ]] && [[ ${STARTSTOP} != "stop" ]]; then
- echo ${USAGE}
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
+ echo $USAGE
exit 1
fi
@@ -37,25 +37,10 @@
# Startup parameters
ARGS=("--configDir" "${FLINK_CONF_DIR}" "${@:2}")
-if [[ ${STARTSTOP} == "start" ]] || [[ ${STARTSTOP} == "start-foreground" ]]; then
- if [ ! -z "${FLINK_JM_HEAP_MB}" ] && [ "${FLINK_JM_HEAP}" == 0 ]; then
- echo "used deprecated key \`${KEY_JOBM_MEM_MB}\`, please replace with key \`${KEY_JOBM_MEM_SIZE}\`"
- else
- flink_jm_heap_bytes=$(parseBytes ${FLINK_JM_HEAP})
- FLINK_JM_HEAP_MB=$(getMebiBytes ${flink_jm_heap_bytes})
- fi
-
- if [[ ! ${FLINK_JM_HEAP_MB} =~ $IS_NUMBER ]] || [[ "${FLINK_JM_HEAP_MB}" -lt "0" ]]; then
- echo "[ERROR] Configured memory size is not a valid value. Please set '${KEY_JOBM_MEM_SIZE}' in ${FLINK_CONF_FILE}."
- exit 1
- fi
-
- if [ "${FLINK_JM_HEAP_MB}" -gt "0" ]; then
- export JVM_ARGS="$JVM_ARGS -Xms"$FLINK_JM_HEAP_MB"m -Xmx"$FLINK_JM_HEAP_MB"m"
- fi
-
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
# Add cluster entry point specific JVM options
export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
+ parseJmJvmArgsAndExportLogs "${ARGS[@]}"
fi
if [[ $STARTSTOP == "start-foreground" ]]; then
diff --git a/template/flink-distribution/bin/taskmanager.sh b/template/flink-distribution/bin/taskmanager.sh
new file mode 100755
index 0000000..dcfca5c
--- /dev/null
+++ b/template/flink-distribution/bin/taskmanager.sh
@@ -0,0 +1,103 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Backported due to FLINK-18639
+# This file can be removed when upgrading
+# to Flink 1.11.2 or 1.12
+
+# Start/stop a Flink TaskManager.
+USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
+
+STARTSTOP=$1
+
+ARGS=("${@:2}")
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+ echo $USAGE
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+ENTRYPOINT=taskexecutor
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+
+ # if no other JVM options are set, set the GC to G1
+ if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
+ fi
+
+ # Add TaskManager-specific JVM options
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+ # Startup parameters
+
+ java_utils_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "$FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)" "${ARGS[@]}")
+
+ logging_output=$(extractLoggingOutputs "${java_utils_output}")
+ params_output=$(extractExecutionResults "${java_utils_output}" 2)
+
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
+ echo "[ERROR] Raw output from BashJavaUtils:"
+ echo "$java_utils_output"
+ exit 1
+ fi
+
+ jvm_params=$(echo "${params_output}" | head -n 1)
+ export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
+ IFS=$" " dynamic_configs=$(echo "${params_output}" | tail -n 1)
+ ARGS+=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]})
+
+ export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+TM_RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+dynamic_configs: $dynamic_configs
+logs: $logging_output
+"
+fi
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
+else
+ if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
+ # Start a single TaskManager
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
+ else
+ # Example output from `numactl --show` on an AWS c4.8xlarge:
+ # policy: default
+ # preferred node: current
+ # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
+ # cpubind: 0 1
+ # nodebind: 0 1
+ # membind: 0 1
+ read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
+ for NODE_ID in "${NODE_LIST[@]:1}"; do
+ # Start a TaskManager for each NUMA node
+ numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
+ done
+ fi
+fi