[release] Update Dockerfiles for 2.2.0 release
diff --git a/2.2.0-java11/Dockerfile b/2.2.0-java11/Dockerfile
new file mode 100644
index 0000000..5f44b00
--- /dev/null
+++ b/2.2.0-java11/Dockerfile
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink:1.11.1-scala_2.12-java11
+
+ENV STATEFUN_VERSION=2.2.0 \
+ GPG_KEY=1C1E2394D3194E1944613488F320986D35C33D6A
+
+ENV ROLE worker
+ENV MASTER_HOST localhost
+ENV STATEFUN_HOME /opt/statefun
+ENV STATEFUN_MODULES $STATEFUN_HOME/modules
+
+# Cleanup flink-lib
+RUN rm -fr $FLINK_HOME/lib/flink-table*jar
+
+# Copy our distriubtion template
+COPY flink-distribution/ $FLINK_HOME/
+
+# Install Stateful Functions dependencies in Flink lib
+ENV DIST_JAR_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-distribution/${STATEFUN_VERSION}/statefun-flink-distribution-${STATEFUN_VERSION}.jar \
+ DIST_ASC_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-distribution/${STATEFUN_VERSION}/statefun-flink-distribution-${STATEFUN_VERSION}.jar.asc \
+ CORE_JAR_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-core/${STATEFUN_VERSION}/statefun-flink-core-${STATEFUN_VERSION}.jar \
+ CORE_ASC_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-core/${STATEFUN_VERSION}/statefun-flink-core-${STATEFUN_VERSION}.jar.asc
+
+RUN set -ex; \
+ wget -nv -O statefun-flink-distribution.jar "$DIST_JAR_URL"; \
+ wget -nv -O statefun-flink-distribution.jar.asc "$DIST_ASC_URL"; \
+ wget -nv -O statefun-flink-core.jar "$CORE_JAR_URL"; \
+ wget -nv -O statefun-flink-core.jar.asc "$CORE_ASC_URL"; \
+ \
+ export GNUPGHOME="$(mktemp -d)"; \
+ for server in ha.pool.sks-keyservers.net $(shuf -e \
+ hkp://p80.pool.sks-keyservers.net:80 \
+ keyserver.ubuntu.com \
+ hkp://keyserver.ubuntu.com:80 \
+ pgp.mit.edu) ; do \
+ gpg --batch --keyserver "$server" --recv-keys "$GPG_KEY" && break || : ; \
+ done && \
+ gpg --batch --verify statefun-flink-distribution.jar.asc statefun-flink-distribution.jar; \
+ gpg --batch --verify statefun-flink-core.jar.asc statefun-flink-core.jar; \
+ gpgconf --kill all; \
+ rm -rf "$GNUPGHOME" statefun-flink-distribution.jar.asc statefun-flink-core.jar.asc; \
+ \
+ mkdir -p $FLINK_HOME/lib; \
+ mv statefun-flink-distribution.jar $FLINK_HOME/lib; \
+ mv statefun-flink-core.jar $FLINK_HOME/lib;
+
+# add user modules
+USER root
+
+RUN mkdir -p $STATEFUN_MODULES && \
+ useradd --system --home-dir $STATEFUN_HOME --uid=9998 --gid=flink statefun && \
+ chown -R statefun:flink $STATEFUN_HOME && \
+ chmod -R g+rw $STATEFUN_HOME
+
+# add filesystem plugins
+RUN mkdir -p $FLINK_HOME/plugins/s3-fs-presto && \
+ mv $FLINK_HOME/opt/flink-s3-fs-presto-1.11.1.jar $FLINK_HOME/plugins/s3-fs-presto
+RUN mkdir -p $FLINK_HOME/plugins/swift-fs-hadoop && \
+ mv $FLINK_HOME/opt/flink-swift-fs-hadoop-1.11.1.jar $FLINK_HOME/plugins/swift-fs-hadoop
+RUN mkdir -p $FLINK_HOME/plugins/oss-fs-hadoop && \
+ mv $FLINK_HOME/opt/flink-oss-fs-hadoop-1.11.1.jar $FLINK_HOME/plugins/oss-fs-hadoop
+RUN mkdir -p $FLINK_HOME/plugins/azure-fs-hadoop && \
+ mv $FLINK_HOME/opt/flink-azure-fs-hadoop-1.11.1.jar $FLINK_HOME/plugins/azure-fs-hadoop
+
+# entry point
+ADD docker-entry-point.sh /docker-entry-point.sh
+
+ENTRYPOINT ["/docker-entry-point.sh"]
diff --git a/2.2.0-java11/docker-entry-point.sh b/2.2.0-java11/docker-entry-point.sh
new file mode 100755
index 0000000..4e502b4
--- /dev/null
+++ b/2.2.0-java11/docker-entry-point.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+#
+# Role types
+#
+WORKER="worker"
+MASTER="master"
+
+#
+# Environment
+#
+FLINK_HOME=${FLINK_HOME:-"/opt/flink/bin"}
+ROLE=${ROLE:-"worker"}
+MASTER_HOST=${MASTER_HOST:-"localhost"}
+
+#
+# Start a service depending on the role.
+#
+if [[ "${ROLE}" == "${WORKER}" ]]; then
+ #
+ # start the TaskManager (worker role)
+ #
+ exec ${FLINK_HOME}/bin/taskmanager.sh start-foreground \
+ -Djobmanager.rpc.address=${MASTER_HOST}
+
+elif [[ "${ROLE}" == "${MASTER}" ]]; then
+ #
+ # start the JobManager (master role) with our predefined job.
+ #
+ exec $FLINK_HOME/bin/standalone-job.sh \
+ start-foreground \
+ -Djobmanager.rpc.address=${MASTER_HOST} \
+ "$@"
+else
+ #
+ # unknown role
+ #
+ echo "unknown role ${ROLE}"
+ exit 1
+fi
diff --git a/2.2.0-java11/flink-distribution/bin/config.sh b/2.2.0-java11/flink-distribution/bin/config.sh
new file mode 100755
index 0000000..268f3fc
--- /dev/null
+++ b/2.2.0-java11/flink-distribution/bin/config.sh
@@ -0,0 +1,574 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Backported due to FLINK-18639
+# This file can be removed when upgrading
+# to Flink 1.11.2 or 1.12
+
+constructFlinkClassPath() {
+ local FLINK_DIST
+ local FLINK_CLASSPATH
+
+ while read -d '' -r jarfile ; do
+ if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then
+ FLINK_DIST="$FLINK_DIST":"$jarfile"
+ elif [[ "$FLINK_CLASSPATH" == "" ]]; then
+ FLINK_CLASSPATH="$jarfile";
+ else
+ FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
+ fi
+ done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
+
+ if [[ "$FLINK_DIST" == "" ]]; then
+ # write error message to stderr since stdout is stored as the classpath
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+ # exit function with empty classpath to force process failure
+ exit 1
+ fi
+
+ echo "$FLINK_CLASSPATH""$FLINK_DIST"
+}
+
+findFlinkDistJar() {
+ local FLINK_DIST="`find "$FLINK_LIB_DIR" -name 'flink-dist*.jar'`"
+
+ if [[ "$FLINK_DIST" == "" ]]; then
+ # write error message to stderr since stdout is stored as the classpath
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+ # exit function with empty classpath to force process failure
+ exit 1
+ fi
+
+ echo "$FLINK_DIST"
+}
+
+# These are used to mangle paths that are passed to java when using
+# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
+# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
+# "cygpath" can do the conversion.
+manglePath() {
+ UNAME=$(uname -s)
+ if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ echo `cygpath -w "$1"`
+ else
+ echo $1
+ fi
+}
+
+manglePathList() {
+ UNAME=$(uname -s)
+ # a path list, for example a java classpath
+ if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ echo `cygpath -wp "$1"`
+ else
+ echo $1
+ fi
+}
+
+# Looks up a config value by key from a simple YAML-style key-value map.
+# $1: key to look up
+# $2: default value to return if key does not exist
+# $3: config file to read from
+readFromConfig() {
+ local key=$1
+ local defaultValue=$2
+ local configFile=$3
+
+ # first extract the value with the given key (1st sed), then trim the result (2nd sed)
+ # if a key exists multiple times, take the "last" one (tail)
+ local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`
+
+ [ -z "$value" ] && echo "$defaultValue" || echo "$value"
+}
+
+########################################################################################################################
+# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml
+# -or- the respective environment variables are not set.
+########################################################################################################################
+
+
+# WARNING !!! , these values are only used if there is nothing else is specified in
+# conf/flink-conf.yaml
+
+DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to
+DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep
+DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
+DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
+DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
+DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)
+DEFAULT_ENV_JAVA_OPTS_CLI="" # Optional JVM args (Client)
+DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
+DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
+DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary
+DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary
+
+########################################################################################################################
+# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
+########################################################################################################################
+
+KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
+
+KEY_ENV_PID_DIR="env.pid.dir"
+KEY_ENV_LOG_DIR="env.log.dir"
+KEY_ENV_LOG_MAX="env.log.max"
+KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
+KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"
+KEY_ENV_JAVA_HOME="env.java.home"
+KEY_ENV_JAVA_OPTS="env.java.opts"
+KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
+KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
+KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"
+KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"
+KEY_ENV_SSH_OPTS="env.ssh.opts"
+KEY_HIGH_AVAILABILITY="high-availability"
+KEY_ZK_HEAP_MB="zookeeper.heap.mb"
+
+########################################################################################################################
+# PATHS AND CONFIG
+########################################################################################################################
+
+target="$0"
+# For the case, the executable has been directly symlinked, figure out
+# the correct bin path by following its symlink up to an upper bound.
+# Note: we can't use the readlink utility here if we want to be POSIX
+# compatible.
+iteration=0
+while [ -L "$target" ]; do
+ if [ "$iteration" -gt 100 ]; then
+ echo "Cannot resolve path: You have a cyclic symlink in $target."
+ break
+ fi
+ ls=`ls -ld -- "$target"`
+ target=`expr "$ls" : '.* -> \(.*\)$'`
+ iteration=$((iteration + 1))
+done
+
+# Convert relative path to absolute path and resolve directory symlinks
+bin=`dirname "$target"`
+SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
+
+# Define the main directory of the flink installation
+# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here.
+if [ -z "$_FLINK_HOME_DETERMINED" ]; then
+ FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
+fi
+FLINK_LIB_DIR=$FLINK_HOME/lib
+FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
+FLINK_OPT_DIR=$FLINK_HOME/opt
+
+
+# These need to be mangled because they are directly passed to java.
+# The above lib path is used by the shell script to retrieve jars in a
+# directory, so it needs to be unmangled.
+FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`
+if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi
+FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin
+DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log
+FLINK_CONF_FILE="flink-conf.yaml"
+YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
+
+### Exported environment variables ###
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_PLUGINS_DIR
+# export /lib dir to access it during deployment of the Yarn staging files
+export FLINK_LIB_DIR
+# export /opt dir to access it for the SQL client
+export FLINK_OPT_DIR
+
+########################################################################################################################
+# ENVIRONMENT VARIABLES
+########################################################################################################################
+
+# read JAVA_HOME from config with no default value
+MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")
+# check if config specified JAVA_HOME
+if [ -z "${MY_JAVA_HOME}" ]; then
+ # config did not specify JAVA_HOME. Use system JAVA_HOME
+ MY_JAVA_HOME=${JAVA_HOME}
+fi
+# check if we have a valid JAVA_HOME and if java is not available
+if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then
+ echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME."
+ exit 1
+else
+ JAVA_HOME=${MY_JAVA_HOME}
+fi
+
+UNAME=$(uname -s)
+if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ JAVA_RUN=java
+else
+ if [[ -d $JAVA_HOME ]]; then
+ JAVA_RUN=$JAVA_HOME/bin/java
+ else
+ JAVA_RUN=java
+ fi
+fi
+
+# Define HOSTNAME if it is not already set
+if [ -z "${HOSTNAME}" ]; then
+ HOSTNAME=`hostname`
+fi
+
+IS_NUMBER="^[0-9]+$"
+
+# Verify that NUMA tooling is available
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+ FLINK_TM_COMPUTE_NUMA="false"
+else
+ # Define FLINK_TM_COMPUTE_NUMA if it is not already set
+ if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then
+ FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}")
+ fi
+fi
+
+if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
+ MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_LOG_DIR}" ]; then
+ FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${YARN_CONF_DIR}" ]; then
+ YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${HADOOP_CONF_DIR}" ]; then
+ HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${HBASE_CONF_DIR}" ]; then
+ HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_PID_DIR}" ]; then
+ FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
+ FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
+
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
+ FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then
+ FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; then
+ FLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_SSH_OPTS}" ]; then
+ FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
+fi
+
+# Define ZK_HEAP if it is not already set
+if [ -z "${ZK_HEAP}" ]; then
+ ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
+fi
+
+# High availability
+if [ -z "${HIGH_AVAILABILITY}" ]; then
+ HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
+ if [ -z "${HIGH_AVAILABILITY}" ]; then
+ # Try deprecated value
+ DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
+ if [ -z "${DEPRECATED_HA}" ]; then
+ HIGH_AVAILABILITY="none"
+ elif [ ${DEPRECATED_HA} == "standalone" ]; then
+ # Standalone is now 'none'
+ HIGH_AVAILABILITY="none"
+ else
+ HIGH_AVAILABILITY=${DEPRECATED_HA}
+ fi
+ fi
+fi
+
+# Arguments for the JVM. Used for job and task manager JVMs.
+# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
+# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!
+if [ -z "${JVM_ARGS}" ]; then
+ JVM_ARGS=""
+fi
+
+# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
+if [ -z "$HADOOP_CONF_DIR" ]; then
+ if [ -n "$HADOOP_HOME" ]; then
+ # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
+ if [ -d "$HADOOP_HOME/conf" ]; then
+ # It's Hadoop 1.x
+ HADOOP_CONF_DIR="$HADOOP_HOME/conf"
+ fi
+ if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
+ # It's Hadoop 2.2+
+ HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
+ fi
+ fi
+fi
+
+# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available)
+if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then
+ if [ -d "/etc/hadoop/conf" ]; then
+ echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set."
+ HADOOP_CONF_DIR="/etc/hadoop/conf"
+ fi
+fi
+
+# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty.
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -n "$HBASE_HOME" ]; then
+ # HBASE_HOME is set.
+ if [ -d "$HBASE_HOME/conf" ]; then
+ HBASE_CONF_DIR="$HBASE_HOME/conf"
+ fi
+ fi
+fi
+
+# try and set HBASE_CONF_DIR to some common default if it's not set
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -d "/etc/hbase/conf" ]; then
+ echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."
+ HBASE_CONF_DIR="/etc/hbase/conf"
+ fi
+fi
+
+INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
+
+if [ -n "${HBASE_CONF_DIR}" ]; then
+ INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
+fi
+
+# Auxilliary function which extracts the name of host from a line which
+# also potentially includes topology information and the taskManager type
+extractHostName() {
+ # handle comments: extract first part of string (before first # character)
+ WORKER=`echo $1 | cut -d'#' -f 1`
+
+ # Extract the hostname from the network hierarchy
+ if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then
+ WORKER=${BASH_REMATCH[1]}
+ fi
+
+ echo $WORKER
+}
+
+# Auxilliary functions for log file rotation
+rotateLogFilesWithPrefix() {
+ dir=$1
+ prefix=$2
+ while read -r log ; do
+ rotateLogFile "$log"
+ # find distinct set of log file names, ignoring the rotation number (trailing dot and digit)
+ done < <(find "$dir" ! -type d -path "${prefix}*" | sed s/\.[0-9][0-9]*$// | sort | uniq)
+}
+
+rotateLogFile() {
+ log=$1;
+ num=$MAX_LOG_FILE_NUMBER
+ if [ -f "$log" -a "$num" -gt 0 ]; then
+ while [ $num -gt 1 ]; do
+ prev=`expr $num - 1`
+ [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
+ num=$prev
+ done
+ mv "$log" "$log.$num";
+ fi
+}
+
+readMasters() {
+ MASTERS_FILE="${FLINK_CONF_DIR}/masters"
+
+ if [[ ! -f "${MASTERS_FILE}" ]]; then
+ echo "No masters file. Please specify masters in 'conf/masters'."
+ exit 1
+ fi
+
+ MASTERS=()
+ WEBUIPORTS=()
+
+ MASTERS_ALL_LOCALHOST=true
+ GOON=true
+ while $GOON; do
+ read line || GOON=false
+ HOSTWEBUIPORT=$( extractHostName $line)
+
+ if [ -n "$HOSTWEBUIPORT" ]; then
+ HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)
+ WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)
+ MASTERS+=(${HOST})
+
+ if [ -z "$WEBUIPORT" ]; then
+ WEBUIPORTS+=(0)
+ else
+ WEBUIPORTS+=(${WEBUIPORT})
+ fi
+
+ if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
+ MASTERS_ALL_LOCALHOST=false
+ fi
+ fi
+ done < "$MASTERS_FILE"
+}
+
+readWorkers() {
+ WORKERS_FILE="${FLINK_CONF_DIR}/workers"
+
+ if [[ ! -f "$WORKERS_FILE" ]]; then
+ echo "No workers file. Please specify workers in 'conf/workers'."
+ exit 1
+ fi
+
+ WORKERS=()
+
+ WORKERS_ALL_LOCALHOST=true
+ GOON=true
+ while $GOON; do
+ read line || GOON=false
+ HOST=$( extractHostName $line)
+ if [ -n "$HOST" ] ; then
+ WORKERS+=(${HOST})
+ if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
+ WORKERS_ALL_LOCALHOST=false
+ fi
+ fi
+ done < "$WORKERS_FILE"
+}
+
+# starts or stops TMs on all workers
+# TMWorkers start|stop
+TMWorkers() {
+ CMD=$1
+
+ readWorkers
+
+ if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then
+ # all-local setup
+ for worker in ${WORKERS[@]}; do
+ "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
+ done
+ else
+ # non-local setup
+ # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
+ command -v pdsh >/dev/null 2>&1
+ if [[ $? -ne 0 ]]; then
+ for worker in ${WORKERS[@]}; do
+ ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
+ done
+ else
+ PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \
+ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
+ fi
+ fi
+}
+
+runBashJavaUtilsCmd() {
+ local cmd=$1
+ local conf_dir=$2
+ local class_path=$3
+ local dynamic_args=${@:4}
+ class_path=`manglePathList "${class_path}"`
+
+ local output=`${JAVA_RUN} -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
+ # Print the output in case the user redirect the log to console.
+ echo "$output" 1>&2
+ exit 1
+ fi
+
+ echo "$output"
+}
+
+extractExecutionResults() {
+ local output="$1"
+ local expected_lines="$2"
+ local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+ local execution_results
+ local num_lines
+
+ execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})
+ num_lines=$(echo "${execution_results}" | wc -l)
+ # explicit check for empty result, becuase if execution_results is empty, then wc returns 1
+ if [[ -z ${execution_results} ]]; then
+ echo "[ERROR] The execution result is empty." 1>&2
+ exit 1
+ fi
+ if [[ ${num_lines} -ne ${expected_lines} ]]; then
+ echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2
+ echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2
+ echo "$output" 1>&2
+ exit 1
+ fi
+
+ echo "${execution_results//${EXECUTION_PREFIX}/}"
+}
+
+extractLoggingOutputs() {
+ local output="$1"
+ local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+
+ echo "${output}" | grep -v ${EXECUTION_PREFIX}
+}
+
+parseJmJvmArgsAndExportLogs() {
+ java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "$@")
+ logging_output=$(extractLoggingOutputs "${java_utils_output}")
+ jvm_params=$(extractExecutionResults "${java_utils_output}" 1)
+
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
+ echo "[ERROR] Raw output from BashJavaUtils:"
+ echo "$java_utils_output"
+ exit 1
+ fi
+
+ export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
+ export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+JM_RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+logs: $logging_output
+"
+}
diff --git a/2.2.0-java11/flink-distribution/bin/flink-console.sh b/2.2.0-java11/flink-distribution/bin/flink-console.sh
new file mode 100755
index 0000000..8fc0b05
--- /dev/null
+++ b/2.2.0-java11/flink-distribution/bin/flink-console.sh
@@ -0,0 +1,113 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file was taken from Apache Flink, and modified to include another entry point
+
+# Start a Flink service as a console application. Must be stopped with Ctrl-C
+# or with SIGTERM by kill or the controlling process.
+USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|statefun) [args]"
+
+SERVICE=$1
+ARGS=("${@:2}") # get remaining arguments as array
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+case $SERVICE in
+ (taskexecutor)
+ CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
+ ;;
+
+ (historyserver)
+ CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
+ ;;
+
+ (zookeeper)
+ CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
+ ;;
+
+ (standalonesession)
+ CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
+ ;;
+
+ (standalonejob)
+ CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
+ ;;
+
+ (statefun)
+ CLASS_TO_RUN=org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint
+ ;;
+
+ (*)
+ echo "Unknown service '${SERVICE}'. $USAGE."
+ exit 1
+ ;;
+esac
+
+FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$SERVICE.pid
+mkdir -p "$FLINK_PID_DIR"
+# The lock needs to be released after use because this script is started foreground
+command -v flock >/dev/null 2>&1
+flock_exist=$?
+if [[ ${flock_exist} -eq 0 ]]; then
+ exec 200<"$FLINK_PID_DIR"
+ flock 200
+fi
+# Remove the pid file when all the processes are dead
+if [ -f "$pid" ]; then
+ all_dead=0
+ while read each_pid; do
+ # Check whether the process is still running
+ kill -0 $each_pid > /dev/null 2>&1
+ [[ $? -eq 0 ]] && all_dead=1
+ done < "$pid"
+ [ ${all_dead} -eq 0 ] && rm $pid
+fi
+id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
+
+FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-${id}-${HOSTNAME}"
+log="${FLINK_LOG_PREFIX}.log"
+
+log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
+
+JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
+
+# Only set JVM 8 arguments if we have correctly extracted the version
+if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
+ if [ "$JAVA_VERSION" -lt 18 ]; then
+ JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
+ fi
+fi
+
+echo "Starting $SERVICE as a console application on host $HOSTNAME."
+
+# Add the current process id to pid file
+echo $$ >> "$pid" 2>/dev/null
+
+# Release the lock because the java process runs in the foreground and would block other processes from modifying the pid file
+[[ ${flock_exist} -eq 0 ]] && flock -u 200
+
+exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
diff --git a/2.2.0-java11/flink-distribution/bin/standalone-job.sh b/2.2.0-java11/flink-distribution/bin/standalone-job.sh
new file mode 100755
index 0000000..f7709e9
--- /dev/null
+++ b/2.2.0-java11/flink-distribution/bin/standalone-job.sh
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file was taken from Apache Flink, and modified to include another entry point
+
+# Start/stop a Flink JobManager.
+USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"
+
+STARTSTOP=$1
+ENTRY_POINT_NAME="statefun"
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
+ echo $USAGE
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Startup parameters
+ARGS=("--configDir" "${FLINK_CONF_DIR}" "${@:2}")
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+ # Add cluster entry point specific JVM options
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
+ parseJmJvmArgsAndExportLogs "${ARGS[@]}"
+fi
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
+else
+ "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
+fi
diff --git a/2.2.0-java11/flink-distribution/bin/taskmanager.sh b/2.2.0-java11/flink-distribution/bin/taskmanager.sh
new file mode 100755
index 0000000..dcfca5c
--- /dev/null
+++ b/2.2.0-java11/flink-distribution/bin/taskmanager.sh
@@ -0,0 +1,103 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Backported due to FLINK-18639
+# This file can be removed when upgrading
+# to Flink 1.11.2 or 1.12
+
+# Start/stop a Flink TaskManager.
+USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
+
+STARTSTOP=$1
+
+ARGS=("${@:2}")
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+ echo $USAGE
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+ENTRYPOINT=taskexecutor
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+
+ # if no other JVM options are set, set the GC to G1
+ if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
+ fi
+
+ # Add TaskManager-specific JVM options
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+ # Startup parameters
+
+ java_utils_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "$FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)" "${ARGS[@]}")
+
+ logging_output=$(extractLoggingOutputs "${java_utils_output}")
+ params_output=$(extractExecutionResults "${java_utils_output}" 2)
+
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
+ echo "[ERROR] Raw output from BashJavaUtils:"
+ echo "$java_utils_output"
+ exit 1
+ fi
+
+ jvm_params=$(echo "${params_output}" | head -n 1)
+ export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
+ IFS=$" " dynamic_configs=$(echo "${params_output}" | tail -n 1)
+ ARGS+=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]})
+
+ export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+TM_RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+dynamic_configs: $dynamic_configs
+logs: $logging_output
+"
+fi
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
+else
+ if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
+ # Start a single TaskManager
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
+ else
+ # Example output from `numactl --show` on an AWS c4.8xlarge:
+ # policy: default
+ # preferred node: current
+ # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
+ # cpubind: 0 1
+ # nodebind: 0 1
+ # membind: 0 1
+ read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
+ for NODE_ID in "${NODE_LIST[@]:1}"; do
+ # Start a TaskManager for each NUMA node
+ numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
+ done
+ fi
+fi
diff --git a/2.2.0-java11/flink-distribution/conf/flink-conf.yaml b/2.2.0-java11/flink-distribution/conf/flink-conf.yaml
new file mode 100644
index 0000000..8b9b4e4
--- /dev/null
+++ b/2.2.0-java11/flink-distribution/conf/flink-conf.yaml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This file is the base for the Apache Flink configuration
+
+statefun.flink-job-name: Statefun Application
+
+#==============================================================================
+# Configurations strictly required by Stateful Functions. Do not change.
+#==============================================================================
+
+classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
+
+#==============================================================================
+# Fault tolerance, checkpointing and recovery.
+# For more related configuration options, please see: https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
+#==============================================================================
+
+# Uncomment the below to enable checkpointing for your application
+#execution.checkpointing.mode: EXACTLY_ONCE
+#execution.checkpointing.interval: 5sec
+
+restart-strategy: fixed-delay
+restart-strategy.fixed-delay.attempts: 2147483647
+restart-strategy.fixed-delay.delay: 1sec
+
+state.backend: rocksdb
+state.backend.rocksdb.timer-service.factory: ROCKSDB
+state.checkpoints.dir: file:///checkpoint-dir
+state.backend.incremental: true
+
+#==============================================================================
+# Recommended memory configurations. Users may change according to their needs.
+#==============================================================================
+
+jobmanager.memory.process.size: 1g
+taskmanager.memory.process.size: 4g
diff --git a/2.2.0-java8/Dockerfile b/2.2.0-java8/Dockerfile
new file mode 100644
index 0000000..f6d1ca2
--- /dev/null
+++ b/2.2.0-java8/Dockerfile
@@ -0,0 +1,82 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+FROM flink:1.11.1-scala_2.12-java8
+
+ENV STATEFUN_VERSION=2.2.0 \
+ GPG_KEY=1C1E2394D3194E1944613488F320986D35C33D6A
+
+ENV ROLE worker
+ENV MASTER_HOST localhost
+ENV STATEFUN_HOME /opt/statefun
+ENV STATEFUN_MODULES $STATEFUN_HOME/modules
+
+# Cleanup flink-lib
+RUN rm -fr $FLINK_HOME/lib/flink-table*jar
+
+# Copy our distriubtion template
+COPY flink-distribution/ $FLINK_HOME/
+
+# Install Stateful Functions dependencies in Flink lib
+ENV DIST_JAR_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-distribution/${STATEFUN_VERSION}/statefun-flink-distribution-${STATEFUN_VERSION}.jar \
+ DIST_ASC_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-distribution/${STATEFUN_VERSION}/statefun-flink-distribution-${STATEFUN_VERSION}.jar.asc \
+ CORE_JAR_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-core/${STATEFUN_VERSION}/statefun-flink-core-${STATEFUN_VERSION}.jar \
+ CORE_ASC_URL=https://repo.maven.apache.org/maven2/org/apache/flink/statefun-flink-core/${STATEFUN_VERSION}/statefun-flink-core-${STATEFUN_VERSION}.jar.asc
+
+RUN set -ex; \
+ wget -nv -O statefun-flink-distribution.jar "$DIST_JAR_URL"; \
+ wget -nv -O statefun-flink-distribution.jar.asc "$DIST_ASC_URL"; \
+ wget -nv -O statefun-flink-core.jar "$CORE_JAR_URL"; \
+ wget -nv -O statefun-flink-core.jar.asc "$CORE_ASC_URL"; \
+ \
+ export GNUPGHOME="$(mktemp -d)"; \
+ for server in ha.pool.sks-keyservers.net $(shuf -e \
+ hkp://p80.pool.sks-keyservers.net:80 \
+ keyserver.ubuntu.com \
+ hkp://keyserver.ubuntu.com:80 \
+ pgp.mit.edu) ; do \
+ gpg --batch --keyserver "$server" --recv-keys "$GPG_KEY" && break || : ; \
+ done && \
+ gpg --batch --verify statefun-flink-distribution.jar.asc statefun-flink-distribution.jar; \
+ gpg --batch --verify statefun-flink-core.jar.asc statefun-flink-core.jar; \
+ gpgconf --kill all; \
+ rm -rf "$GNUPGHOME" statefun-flink-distribution.jar.asc statefun-flink-core.jar.asc; \
+ \
+ mkdir -p $FLINK_HOME/lib; \
+ mv statefun-flink-distribution.jar $FLINK_HOME/lib; \
+ mv statefun-flink-core.jar $FLINK_HOME/lib;
+
+# add user modules
+USER root
+
+RUN mkdir -p $STATEFUN_MODULES && \
+ useradd --system --home-dir $STATEFUN_HOME --uid=9998 --gid=flink statefun && \
+ chown -R statefun:flink $STATEFUN_HOME && \
+ chmod -R g+rw $STATEFUN_HOME
+
+# add filesystem plugins
+RUN mkdir -p $FLINK_HOME/plugins/s3-fs-presto && \
+ mv $FLINK_HOME/opt/flink-s3-fs-presto-1.11.1.jar $FLINK_HOME/plugins/s3-fs-presto
+RUN mkdir -p $FLINK_HOME/plugins/swift-fs-hadoop && \
+ mv $FLINK_HOME/opt/flink-swift-fs-hadoop-1.11.1.jar $FLINK_HOME/plugins/swift-fs-hadoop
+RUN mkdir -p $FLINK_HOME/plugins/oss-fs-hadoop && \
+ mv $FLINK_HOME/opt/flink-oss-fs-hadoop-1.11.1.jar $FLINK_HOME/plugins/oss-fs-hadoop
+RUN mkdir -p $FLINK_HOME/plugins/azure-fs-hadoop && \
+ mv $FLINK_HOME/opt/flink-azure-fs-hadoop-1.11.1.jar $FLINK_HOME/plugins/azure-fs-hadoop
+
+# entry point
+ADD docker-entry-point.sh /docker-entry-point.sh
+
+ENTRYPOINT ["/docker-entry-point.sh"]
diff --git a/2.2.0-java8/docker-entry-point.sh b/2.2.0-java8/docker-entry-point.sh
new file mode 100755
index 0000000..4e502b4
--- /dev/null
+++ b/2.2.0-java8/docker-entry-point.sh
@@ -0,0 +1,56 @@
+#!/bin/bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+#
+# Role types
+#
+WORKER="worker"
+MASTER="master"
+
+#
+# Environment
+#
+FLINK_HOME=${FLINK_HOME:-"/opt/flink/bin"}
+ROLE=${ROLE:-"worker"}
+MASTER_HOST=${MASTER_HOST:-"localhost"}
+
+#
+# Start a service depending on the role.
+#
+if [[ "${ROLE}" == "${WORKER}" ]]; then
+ #
+ # start the TaskManager (worker role)
+ #
+ exec ${FLINK_HOME}/bin/taskmanager.sh start-foreground \
+ -Djobmanager.rpc.address=${MASTER_HOST}
+
+elif [[ "${ROLE}" == "${MASTER}" ]]; then
+ #
+ # start the JobManager (master role) with our predefined job.
+ #
+ exec $FLINK_HOME/bin/standalone-job.sh \
+ start-foreground \
+ -Djobmanager.rpc.address=${MASTER_HOST} \
+ "$@"
+else
+ #
+ # unknown role
+ #
+ echo "unknown role ${ROLE}"
+ exit 1
+fi
diff --git a/2.2.0-java8/flink-distribution/bin/config.sh b/2.2.0-java8/flink-distribution/bin/config.sh
new file mode 100755
index 0000000..268f3fc
--- /dev/null
+++ b/2.2.0-java8/flink-distribution/bin/config.sh
@@ -0,0 +1,574 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Backported due to FLINK-18639
+# This file can be removed when upgrading
+# to Flink 1.11.2 or 1.12
+
+constructFlinkClassPath() {
+ local FLINK_DIST
+ local FLINK_CLASSPATH
+
+ while read -d '' -r jarfile ; do
+ if [[ "$jarfile" =~ .*/flink-dist[^/]*.jar$ ]]; then
+ FLINK_DIST="$FLINK_DIST":"$jarfile"
+ elif [[ "$FLINK_CLASSPATH" == "" ]]; then
+ FLINK_CLASSPATH="$jarfile";
+ else
+ FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
+ fi
+ done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
+
+ if [[ "$FLINK_DIST" == "" ]]; then
+ # write error message to stderr since stdout is stored as the classpath
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+ # exit function with empty classpath to force process failure
+ exit 1
+ fi
+
+ echo "$FLINK_CLASSPATH""$FLINK_DIST"
+}
+
+findFlinkDistJar() {
+ local FLINK_DIST="`find "$FLINK_LIB_DIR" -name 'flink-dist*.jar'`"
+
+ if [[ "$FLINK_DIST" == "" ]]; then
+ # write error message to stderr since stdout is stored as the classpath
+ (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+ # exit function with empty classpath to force process failure
+ exit 1
+ fi
+
+ echo "$FLINK_DIST"
+}
+
+# These are used to mangle paths that are passed to java when using
+# cygwin. Cygwin paths are like linux paths, i.e. /path/to/somewhere
+# but the windows java version expects them in Windows Format, i.e. C:\bla\blub.
+# "cygpath" can do the conversion.
+manglePath() {
+ UNAME=$(uname -s)
+ if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ echo `cygpath -w "$1"`
+ else
+ echo $1
+ fi
+}
+
+manglePathList() {
+ UNAME=$(uname -s)
+ # a path list, for example a java classpath
+ if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ echo `cygpath -wp "$1"`
+ else
+ echo $1
+ fi
+}
+
+# Looks up a config value by key from a simple YAML-style key-value map.
+# $1: key to look up
+# $2: default value to return if key does not exist
+# $3: config file to read from
+readFromConfig() {
+ local key=$1
+ local defaultValue=$2
+ local configFile=$3
+
+ # first extract the value with the given key (1st sed), then trim the result (2nd sed)
+ # if a key exists multiple times, take the "last" one (tail)
+ local value=`sed -n "s/^[ ]*${key}[ ]*: \([^#]*\).*$/\1/p" "${configFile}" | sed "s/^ *//;s/ *$//" | tail -n 1`
+
+ [ -z "$value" ] && echo "$defaultValue" || echo "$value"
+}
+
+########################################################################################################################
+# DEFAULT CONFIG VALUES: These values will be used when nothing has been specified in conf/flink-conf.yaml
+# -or- the respective environment variables are not set.
+########################################################################################################################
+
+
+# WARNING !!! , these values are only used if there is nothing else is specified in
+# conf/flink-conf.yaml
+
+DEFAULT_ENV_PID_DIR="/tmp" # Directory to store *.pid files to
+DEFAULT_ENV_LOG_MAX=5 # Maximum number of old log files to keep
+DEFAULT_ENV_JAVA_OPTS="" # Optional JVM args
+DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager)
+DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager)
+DEFAULT_ENV_JAVA_OPTS_HS="" # Optional JVM args (HistoryServer)
+DEFAULT_ENV_JAVA_OPTS_CLI="" # Optional JVM args (Client)
+DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode
+DEFAULT_YARN_CONF_DIR="" # YARN Configuration Directory, if necessary
+DEFAULT_HADOOP_CONF_DIR="" # Hadoop Configuration Directory, if necessary
+DEFAULT_HBASE_CONF_DIR="" # HBase Configuration Directory, if necessary
+
+########################################################################################################################
+# CONFIG KEYS: The default values can be overwritten by the following keys in conf/flink-conf.yaml
+########################################################################################################################
+
+KEY_TASKM_COMPUTE_NUMA="taskmanager.compute.numa"
+
+KEY_ENV_PID_DIR="env.pid.dir"
+KEY_ENV_LOG_DIR="env.log.dir"
+KEY_ENV_LOG_MAX="env.log.max"
+KEY_ENV_YARN_CONF_DIR="env.yarn.conf.dir"
+KEY_ENV_HADOOP_CONF_DIR="env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR="env.hbase.conf.dir"
+KEY_ENV_JAVA_HOME="env.java.home"
+KEY_ENV_JAVA_OPTS="env.java.opts"
+KEY_ENV_JAVA_OPTS_JM="env.java.opts.jobmanager"
+KEY_ENV_JAVA_OPTS_TM="env.java.opts.taskmanager"
+KEY_ENV_JAVA_OPTS_HS="env.java.opts.historyserver"
+KEY_ENV_JAVA_OPTS_CLI="env.java.opts.client"
+KEY_ENV_SSH_OPTS="env.ssh.opts"
+KEY_HIGH_AVAILABILITY="high-availability"
+KEY_ZK_HEAP_MB="zookeeper.heap.mb"
+
+########################################################################################################################
+# PATHS AND CONFIG
+########################################################################################################################
+
+target="$0"
+# For the case, the executable has been directly symlinked, figure out
+# the correct bin path by following its symlink up to an upper bound.
+# Note: we can't use the readlink utility here if we want to be POSIX
+# compatible.
+iteration=0
+while [ -L "$target" ]; do
+ if [ "$iteration" -gt 100 ]; then
+ echo "Cannot resolve path: You have a cyclic symlink in $target."
+ break
+ fi
+ ls=`ls -ld -- "$target"`
+ target=`expr "$ls" : '.* -> \(.*\)$'`
+ iteration=$((iteration + 1))
+done
+
+# Convert relative path to absolute path and resolve directory symlinks
+bin=`dirname "$target"`
+SYMLINK_RESOLVED_BIN=`cd "$bin"; pwd -P`
+
+# Define the main directory of the flink installation
+# If config.sh is called by pyflink-shell.sh in python bin directory(pip installed), then do not need to set the FLINK_HOME here.
+if [ -z "$_FLINK_HOME_DETERMINED" ]; then
+ FLINK_HOME=`dirname "$SYMLINK_RESOLVED_BIN"`
+fi
+FLINK_LIB_DIR=$FLINK_HOME/lib
+FLINK_PLUGINS_DIR=$FLINK_HOME/plugins
+FLINK_OPT_DIR=$FLINK_HOME/opt
+
+
+# These need to be mangled because they are directly passed to java.
+# The above lib path is used by the shell script to retrieve jars in a
+# directory, so it needs to be unmangled.
+FLINK_HOME_DIR_MANGLED=`manglePath "$FLINK_HOME"`
+if [ -z "$FLINK_CONF_DIR" ]; then FLINK_CONF_DIR=$FLINK_HOME_DIR_MANGLED/conf; fi
+FLINK_BIN_DIR=$FLINK_HOME_DIR_MANGLED/bin
+DEFAULT_FLINK_LOG_DIR=$FLINK_HOME_DIR_MANGLED/log
+FLINK_CONF_FILE="flink-conf.yaml"
+YAML_CONF=${FLINK_CONF_DIR}/${FLINK_CONF_FILE}
+
+### Exported environment variables ###
+export FLINK_CONF_DIR
+export FLINK_BIN_DIR
+export FLINK_PLUGINS_DIR
+# export /lib dir to access it during deployment of the Yarn staging files
+export FLINK_LIB_DIR
+# export /opt dir to access it for the SQL client
+export FLINK_OPT_DIR
+
+########################################################################################################################
+# ENVIRONMENT VARIABLES
+########################################################################################################################
+
+# read JAVA_HOME from config with no default value
+MY_JAVA_HOME=$(readFromConfig ${KEY_ENV_JAVA_HOME} "" "${YAML_CONF}")
+# check if config specified JAVA_HOME
+if [ -z "${MY_JAVA_HOME}" ]; then
+ # config did not specify JAVA_HOME. Use system JAVA_HOME
+ MY_JAVA_HOME=${JAVA_HOME}
+fi
+# check if we have a valid JAVA_HOME and if java is not available
+if [ -z "${MY_JAVA_HOME}" ] && ! type java > /dev/null 2> /dev/null; then
+ echo "Please specify JAVA_HOME. Either in Flink config ./conf/flink-conf.yaml or as system-wide JAVA_HOME."
+ exit 1
+else
+ JAVA_HOME=${MY_JAVA_HOME}
+fi
+
+UNAME=$(uname -s)
+if [ "${UNAME:0:6}" == "CYGWIN" ]; then
+ JAVA_RUN=java
+else
+ if [[ -d $JAVA_HOME ]]; then
+ JAVA_RUN=$JAVA_HOME/bin/java
+ else
+ JAVA_RUN=java
+ fi
+fi
+
+# Define HOSTNAME if it is not already set
+if [ -z "${HOSTNAME}" ]; then
+ HOSTNAME=`hostname`
+fi
+
+IS_NUMBER="^[0-9]+$"
+
+# Verify that NUMA tooling is available
+command -v numactl >/dev/null 2>&1
+if [[ $? -ne 0 ]]; then
+ FLINK_TM_COMPUTE_NUMA="false"
+else
+ # Define FLINK_TM_COMPUTE_NUMA if it is not already set
+ if [ -z "${FLINK_TM_COMPUTE_NUMA}" ]; then
+ FLINK_TM_COMPUTE_NUMA=$(readFromConfig ${KEY_TASKM_COMPUTE_NUMA} "false" "${YAML_CONF}")
+ fi
+fi
+
+if [ -z "${MAX_LOG_FILE_NUMBER}" ]; then
+ MAX_LOG_FILE_NUMBER=$(readFromConfig ${KEY_ENV_LOG_MAX} ${DEFAULT_ENV_LOG_MAX} "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_LOG_DIR}" ]; then
+ FLINK_LOG_DIR=$(readFromConfig ${KEY_ENV_LOG_DIR} "${DEFAULT_FLINK_LOG_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${YARN_CONF_DIR}" ]; then
+ YARN_CONF_DIR=$(readFromConfig ${KEY_ENV_YARN_CONF_DIR} "${DEFAULT_YARN_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${HADOOP_CONF_DIR}" ]; then
+ HADOOP_CONF_DIR=$(readFromConfig ${KEY_ENV_HADOOP_CONF_DIR} "${DEFAULT_HADOOP_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${HBASE_CONF_DIR}" ]; then
+ HBASE_CONF_DIR=$(readFromConfig ${KEY_ENV_HBASE_CONF_DIR} "${DEFAULT_HBASE_CONF_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_PID_DIR}" ]; then
+ FLINK_PID_DIR=$(readFromConfig ${KEY_ENV_PID_DIR} "${DEFAULT_ENV_PID_DIR}" "${YAML_CONF}")
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS}" ]; then
+ FLINK_ENV_JAVA_OPTS=$(readFromConfig ${KEY_ENV_JAVA_OPTS} "${DEFAULT_ENV_JAVA_OPTS}" "${YAML_CONF}")
+
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS="$( echo "${FLINK_ENV_JAVA_OPTS}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_JM}" ]; then
+ FLINK_ENV_JAVA_OPTS_JM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_JM} "${DEFAULT_ENV_JAVA_OPTS_JM}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_JM="$( echo "${FLINK_ENV_JAVA_OPTS_JM}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ FLINK_ENV_JAVA_OPTS_TM=$(readFromConfig ${KEY_ENV_JAVA_OPTS_TM} "${DEFAULT_ENV_JAVA_OPTS_TM}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_TM="$( echo "${FLINK_ENV_JAVA_OPTS_TM}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_HS}" ]; then
+ FLINK_ENV_JAVA_OPTS_HS=$(readFromConfig ${KEY_ENV_JAVA_OPTS_HS} "${DEFAULT_ENV_JAVA_OPTS_HS}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_HS="$( echo "${FLINK_ENV_JAVA_OPTS_HS}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_ENV_JAVA_OPTS_CLI}" ]; then
+ FLINK_ENV_JAVA_OPTS_CLI=$(readFromConfig ${KEY_ENV_JAVA_OPTS_CLI} "${DEFAULT_ENV_JAVA_OPTS_CLI}" "${YAML_CONF}")
+ # Remove leading and ending double quotes (if present) of value
+ FLINK_ENV_JAVA_OPTS_CLI="$( echo "${FLINK_ENV_JAVA_OPTS_CLI}" | sed -e 's/^"//' -e 's/"$//' )"
+fi
+
+if [ -z "${FLINK_SSH_OPTS}" ]; then
+ FLINK_SSH_OPTS=$(readFromConfig ${KEY_ENV_SSH_OPTS} "${DEFAULT_ENV_SSH_OPTS}" "${YAML_CONF}")
+fi
+
+# Define ZK_HEAP if it is not already set
+if [ -z "${ZK_HEAP}" ]; then
+ ZK_HEAP=$(readFromConfig ${KEY_ZK_HEAP_MB} 0 "${YAML_CONF}")
+fi
+
+# High availability
+if [ -z "${HIGH_AVAILABILITY}" ]; then
+ HIGH_AVAILABILITY=$(readFromConfig ${KEY_HIGH_AVAILABILITY} "" "${YAML_CONF}")
+ if [ -z "${HIGH_AVAILABILITY}" ]; then
+ # Try deprecated value
+ DEPRECATED_HA=$(readFromConfig "recovery.mode" "" "${YAML_CONF}")
+ if [ -z "${DEPRECATED_HA}" ]; then
+ HIGH_AVAILABILITY="none"
+ elif [ ${DEPRECATED_HA} == "standalone" ]; then
+ # Standalone is now 'none'
+ HIGH_AVAILABILITY="none"
+ else
+ HIGH_AVAILABILITY=${DEPRECATED_HA}
+ fi
+ fi
+fi
+
+# Arguments for the JVM. Used for job and task manager JVMs.
+# DO NOT USE FOR MEMORY SETTINGS! Use conf/flink-conf.yaml with keys
+# JobManagerOptions#TOTAL_PROCESS_MEMORY and TaskManagerOptions#TOTAL_PROCESS_MEMORY for that!
+if [ -z "${JVM_ARGS}" ]; then
+ JVM_ARGS=""
+fi
+
+# Check if deprecated HADOOP_HOME is set, and specify config path to HADOOP_CONF_DIR if it's empty.
+if [ -z "$HADOOP_CONF_DIR" ]; then
+ if [ -n "$HADOOP_HOME" ]; then
+ # HADOOP_HOME is set. Check if its a Hadoop 1.x or 2.x HADOOP_HOME path
+ if [ -d "$HADOOP_HOME/conf" ]; then
+ # It's Hadoop 1.x
+ HADOOP_CONF_DIR="$HADOOP_HOME/conf"
+ fi
+ if [ -d "$HADOOP_HOME/etc/hadoop" ]; then
+ # It's Hadoop 2.2+
+ HADOOP_CONF_DIR="$HADOOP_HOME/etc/hadoop"
+ fi
+ fi
+fi
+
+# if neither HADOOP_CONF_DIR nor HADOOP_CLASSPATH are set, use some common default (if available)
+if [ -z "$HADOOP_CONF_DIR" ] && [ -z "$HADOOP_CLASSPATH" ]; then
+ if [ -d "/etc/hadoop/conf" ]; then
+ echo "Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set."
+ HADOOP_CONF_DIR="/etc/hadoop/conf"
+ fi
+fi
+
+# Check if deprecated HBASE_HOME is set, and specify config path to HBASE_CONF_DIR if it's empty.
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -n "$HBASE_HOME" ]; then
+ # HBASE_HOME is set.
+ if [ -d "$HBASE_HOME/conf" ]; then
+ HBASE_CONF_DIR="$HBASE_HOME/conf"
+ fi
+ fi
+fi
+
+# try and set HBASE_CONF_DIR to some common default if it's not set
+if [ -z "$HBASE_CONF_DIR" ]; then
+ if [ -d "/etc/hbase/conf" ]; then
+ echo "Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set."
+ HBASE_CONF_DIR="/etc/hbase/conf"
+ fi
+fi
+
+INTERNAL_HADOOP_CLASSPATHS="${HADOOP_CLASSPATH}:${HADOOP_CONF_DIR}:${YARN_CONF_DIR}"
+
+if [ -n "${HBASE_CONF_DIR}" ]; then
+ INTERNAL_HADOOP_CLASSPATHS="${INTERNAL_HADOOP_CLASSPATHS}:${HBASE_CONF_DIR}"
+fi
+
+# Auxilliary function which extracts the name of host from a line which
+# also potentially includes topology information and the taskManager type
+extractHostName() {
+ # handle comments: extract first part of string (before first # character)
+ WORKER=`echo $1 | cut -d'#' -f 1`
+
+ # Extract the hostname from the network hierarchy
+ if [[ "$WORKER" =~ ^.*/([0-9a-zA-Z.-]+)$ ]]; then
+ WORKER=${BASH_REMATCH[1]}
+ fi
+
+ echo $WORKER
+}
+
+# Auxilliary functions for log file rotation
+rotateLogFilesWithPrefix() {
+ dir=$1
+ prefix=$2
+ while read -r log ; do
+ rotateLogFile "$log"
+ # find distinct set of log file names, ignoring the rotation number (trailing dot and digit)
+ done < <(find "$dir" ! -type d -path "${prefix}*" | sed s/\.[0-9][0-9]*$// | sort | uniq)
+}
+
+rotateLogFile() {
+ log=$1;
+ num=$MAX_LOG_FILE_NUMBER
+ if [ -f "$log" -a "$num" -gt 0 ]; then
+ while [ $num -gt 1 ]; do
+ prev=`expr $num - 1`
+ [ -f "$log.$prev" ] && mv "$log.$prev" "$log.$num"
+ num=$prev
+ done
+ mv "$log" "$log.$num";
+ fi
+}
+
+readMasters() {
+ MASTERS_FILE="${FLINK_CONF_DIR}/masters"
+
+ if [[ ! -f "${MASTERS_FILE}" ]]; then
+ echo "No masters file. Please specify masters in 'conf/masters'."
+ exit 1
+ fi
+
+ MASTERS=()
+ WEBUIPORTS=()
+
+ MASTERS_ALL_LOCALHOST=true
+ GOON=true
+ while $GOON; do
+ read line || GOON=false
+ HOSTWEBUIPORT=$( extractHostName $line)
+
+ if [ -n "$HOSTWEBUIPORT" ]; then
+ HOST=$(echo $HOSTWEBUIPORT | cut -f1 -d:)
+ WEBUIPORT=$(echo $HOSTWEBUIPORT | cut -s -f2 -d:)
+ MASTERS+=(${HOST})
+
+ if [ -z "$WEBUIPORT" ]; then
+ WEBUIPORTS+=(0)
+ else
+ WEBUIPORTS+=(${WEBUIPORT})
+ fi
+
+ if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
+ MASTERS_ALL_LOCALHOST=false
+ fi
+ fi
+ done < "$MASTERS_FILE"
+}
+
+readWorkers() {
+ WORKERS_FILE="${FLINK_CONF_DIR}/workers"
+
+ if [[ ! -f "$WORKERS_FILE" ]]; then
+ echo "No workers file. Please specify workers in 'conf/workers'."
+ exit 1
+ fi
+
+ WORKERS=()
+
+ WORKERS_ALL_LOCALHOST=true
+ GOON=true
+ while $GOON; do
+ read line || GOON=false
+ HOST=$( extractHostName $line)
+ if [ -n "$HOST" ] ; then
+ WORKERS+=(${HOST})
+ if [ "${HOST}" != "localhost" ] && [ "${HOST}" != "127.0.0.1" ] ; then
+ WORKERS_ALL_LOCALHOST=false
+ fi
+ fi
+ done < "$WORKERS_FILE"
+}
+
+# starts or stops TMs on all workers
+# TMWorkers start|stop
+TMWorkers() {
+ CMD=$1
+
+ readWorkers
+
+ if [ ${WORKERS_ALL_LOCALHOST} = true ] ; then
+ # all-local setup
+ for worker in ${WORKERS[@]}; do
+ "${FLINK_BIN_DIR}"/taskmanager.sh "${CMD}"
+ done
+ else
+ # non-local setup
+ # start/stop TaskManager instance(s) using pdsh (Parallel Distributed Shell) when available
+ command -v pdsh >/dev/null 2>&1
+ if [[ $? -ne 0 ]]; then
+ for worker in ${WORKERS[@]}; do
+ ssh -n $FLINK_SSH_OPTS $worker -- "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\" &"
+ done
+ else
+ PDSH_SSH_ARGS="" PDSH_SSH_ARGS_APPEND=$FLINK_SSH_OPTS pdsh -w $(IFS=, ; echo "${WORKERS[*]}") \
+ "nohup /bin/bash -l \"${FLINK_BIN_DIR}/taskmanager.sh\" \"${CMD}\""
+ fi
+ fi
+}
+
+runBashJavaUtilsCmd() {
+ local cmd=$1
+ local conf_dir=$2
+ local class_path=$3
+ local dynamic_args=${@:4}
+ class_path=`manglePathList "${class_path}"`
+
+ local output=`${JAVA_RUN} -classpath "${class_path}" org.apache.flink.runtime.util.bash.BashJavaUtils ${cmd} --configDir "${conf_dir}" $dynamic_args 2>&1 | tail -n 1000`
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Cannot run BashJavaUtils to execute command ${cmd}." 1>&2
+ # Print the output in case the user redirect the log to console.
+ echo "$output" 1>&2
+ exit 1
+ fi
+
+ echo "$output"
+}
+
+extractExecutionResults() {
+ local output="$1"
+ local expected_lines="$2"
+ local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+ local execution_results
+ local num_lines
+
+ execution_results=$(echo "${output}" | grep ${EXECUTION_PREFIX})
+ num_lines=$(echo "${execution_results}" | wc -l)
+ # explicit check for empty result, becuase if execution_results is empty, then wc returns 1
+ if [[ -z ${execution_results} ]]; then
+ echo "[ERROR] The execution result is empty." 1>&2
+ exit 1
+ fi
+ if [[ ${num_lines} -ne ${expected_lines} ]]; then
+ echo "[ERROR] The execution results has unexpected number of lines, expected: ${expected_lines}, actual: ${num_lines}." 1>&2
+ echo "[ERROR] An execution result line is expected following the prefix '${EXECUTION_PREFIX}'" 1>&2
+ echo "$output" 1>&2
+ exit 1
+ fi
+
+ echo "${execution_results//${EXECUTION_PREFIX}/}"
+}
+
+extractLoggingOutputs() {
+ local output="$1"
+ local EXECUTION_PREFIX="BASH_JAVA_UTILS_EXEC_RESULT:"
+
+ echo "${output}" | grep -v ${EXECUTION_PREFIX}
+}
+
+parseJmJvmArgsAndExportLogs() {
+ java_utils_output=$(runBashJavaUtilsCmd GET_JM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "${FLINK_BIN_DIR}/bash-java-utils.jar:$(findFlinkDistJar)" "$@")
+ logging_output=$(extractLoggingOutputs "${java_utils_output}")
+ jvm_params=$(extractExecutionResults "${java_utils_output}" 1)
+
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
+ echo "[ERROR] Raw output from BashJavaUtils:"
+ echo "$java_utils_output"
+ exit 1
+ fi
+
+ export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
+ export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+JM_RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+logs: $logging_output
+"
+}
diff --git a/2.2.0-java8/flink-distribution/bin/flink-console.sh b/2.2.0-java8/flink-distribution/bin/flink-console.sh
new file mode 100755
index 0000000..8fc0b05
--- /dev/null
+++ b/2.2.0-java8/flink-distribution/bin/flink-console.sh
@@ -0,0 +1,113 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file was taken from Apache Flink, and modified to include another entry point
+
+# Start a Flink service as a console application. Must be stopped with Ctrl-C
+# or with SIGTERM by kill or the controlling process.
+USAGE="Usage: flink-console.sh (taskexecutor|zookeeper|historyserver|standalonesession|standalonejob|statefun) [args]"
+
+SERVICE=$1
+ARGS=("${@:2}") # get remaining arguments as array
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+case $SERVICE in
+ (taskexecutor)
+ CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
+ ;;
+
+ (historyserver)
+ CLASS_TO_RUN=org.apache.flink.runtime.webmonitor.history.HistoryServer
+ ;;
+
+ (zookeeper)
+ CLASS_TO_RUN=org.apache.flink.runtime.zookeeper.FlinkZooKeeperQuorumPeer
+ ;;
+
+ (standalonesession)
+ CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
+ ;;
+
+ (standalonejob)
+ CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
+ ;;
+
+ (statefun)
+ CLASS_TO_RUN=org.apache.flink.statefun.flink.launcher.StatefulFunctionsClusterEntryPoint
+ ;;
+
+ (*)
+ echo "Unknown service '${SERVICE}'. $USAGE."
+ exit 1
+ ;;
+esac
+
+FLINK_TM_CLASSPATH=`constructFlinkClassPath`
+
+if [ "$FLINK_IDENT_STRING" = "" ]; then
+ FLINK_IDENT_STRING="$USER"
+fi
+
+pid=$FLINK_PID_DIR/flink-$FLINK_IDENT_STRING-$SERVICE.pid
+mkdir -p "$FLINK_PID_DIR"
+# The lock needs to be released after use because this script is started foreground
+command -v flock >/dev/null 2>&1
+flock_exist=$?
+if [[ ${flock_exist} -eq 0 ]]; then
+ exec 200<"$FLINK_PID_DIR"
+ flock 200
+fi
+# Remove the pid file when all the processes are dead
+if [ -f "$pid" ]; then
+ all_dead=0
+ while read each_pid; do
+ # Check whether the process is still running
+ kill -0 $each_pid > /dev/null 2>&1
+ [[ $? -eq 0 ]] && all_dead=1
+ done < "$pid"
+ [ ${all_dead} -eq 0 ] && rm $pid
+fi
+id=$([ -f "$pid" ] && echo $(wc -l < "$pid") || echo "0")
+
+FLINK_LOG_PREFIX="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-${SERVICE}-${id}-${HOSTNAME}"
+log="${FLINK_LOG_PREFIX}.log"
+
+log_setting=("-Dlog.file=${log}" "-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlog4j.configurationFile=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml")
+
+JAVA_VERSION=$(${JAVA_RUN} -version 2>&1 | sed 's/.*version "\(.*\)\.\(.*\)\..*"/\1\2/; 1q')
+
+# Only set JVM 8 arguments if we have correctly extracted the version
+if [[ ${JAVA_VERSION} =~ ${IS_NUMBER} ]]; then
+ if [ "$JAVA_VERSION" -lt 18 ]; then
+ JVM_ARGS="$JVM_ARGS -XX:MaxPermSize=256m"
+ fi
+fi
+
+echo "Starting $SERVICE as a console application on host $HOSTNAME."
+
+# Add the current process id to pid file
+echo $$ >> "$pid" 2>/dev/null
+
+# Release the lock because the java process runs in the foreground and would block other processes from modifying the pid file
+[[ ${flock_exist} -eq 0 ]] && flock -u 200
+
+exec $JAVA_RUN $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" -classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} "${ARGS[@]}"
diff --git a/2.2.0-java8/flink-distribution/bin/standalone-job.sh b/2.2.0-java8/flink-distribution/bin/standalone-job.sh
new file mode 100755
index 0000000..f7709e9
--- /dev/null
+++ b/2.2.0-java8/flink-distribution/bin/standalone-job.sh
@@ -0,0 +1,50 @@
+#!/usr/bin/env bash
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+# This file was taken from Apache Flink, and modified to include another entry point
+
+# Start/stop a Flink JobManager.
+USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"
+
+STARTSTOP=$1
+ENTRY_POINT_NAME="statefun"
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
+ echo $USAGE
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+# Startup parameters
+ARGS=("--configDir" "${FLINK_CONF_DIR}" "${@:2}")
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+ # Add cluster entry point specific JVM options
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
+ parseJmJvmArgsAndExportLogs "${ARGS[@]}"
+fi
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
+else
+ "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
+fi
diff --git a/2.2.0-java8/flink-distribution/bin/taskmanager.sh b/2.2.0-java8/flink-distribution/bin/taskmanager.sh
new file mode 100755
index 0000000..dcfca5c
--- /dev/null
+++ b/2.2.0-java8/flink-distribution/bin/taskmanager.sh
@@ -0,0 +1,103 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Backported due to FLINK-18639
+# This file can be removed when upgrading
+# to Flink 1.11.2 or 1.12
+
+# Start/stop a Flink TaskManager.
+USAGE="Usage: taskmanager.sh (start|start-foreground|stop|stop-all)"
+
+STARTSTOP=$1
+
+ARGS=("${@:2}")
+
+if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]] && [[ $STARTSTOP != "stop-all" ]]; then
+ echo $USAGE
+ exit 1
+fi
+
+bin=`dirname "$0"`
+bin=`cd "$bin"; pwd`
+
+. "$bin"/config.sh
+
+ENTRYPOINT=taskexecutor
+
+if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
+
+ # if no other JVM options are set, set the GC to G1
+ if [ -z "${FLINK_ENV_JAVA_OPTS}" ] && [ -z "${FLINK_ENV_JAVA_OPTS_TM}" ]; then
+ export JVM_ARGS="$JVM_ARGS -XX:+UseG1GC"
+ fi
+
+ # Add TaskManager-specific JVM options
+ export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_TM}"
+
+ # Startup parameters
+
+ java_utils_output=$(runBashJavaUtilsCmd GET_TM_RESOURCE_PARAMS "${FLINK_CONF_DIR}" "$FLINK_BIN_DIR/bash-java-utils.jar:$(findFlinkDistJar)" "${ARGS[@]}")
+
+ logging_output=$(extractLoggingOutputs "${java_utils_output}")
+ params_output=$(extractExecutionResults "${java_utils_output}" 2)
+
+ if [[ $? -ne 0 ]]; then
+ echo "[ERROR] Could not get JVM parameters and dynamic configurations properly."
+ echo "[ERROR] Raw output from BashJavaUtils:"
+ echo "$java_utils_output"
+ exit 1
+ fi
+
+ jvm_params=$(echo "${params_output}" | head -n 1)
+ export JVM_ARGS="${JVM_ARGS} ${jvm_params}"
+
+ IFS=$" " dynamic_configs=$(echo "${params_output}" | tail -n 1)
+ ARGS+=("--configDir" "${FLINK_CONF_DIR}" ${dynamic_configs[@]})
+
+ export FLINK_INHERITED_LOGS="
+$FLINK_INHERITED_LOGS
+
+TM_RESOURCE_PARAMS extraction logs:
+jvm_params: $jvm_params
+dynamic_configs: $dynamic_configs
+logs: $logging_output
+"
+fi
+
+if [[ $STARTSTOP == "start-foreground" ]]; then
+ exec "${FLINK_BIN_DIR}"/flink-console.sh $ENTRYPOINT "${ARGS[@]}"
+else
+ if [[ $FLINK_TM_COMPUTE_NUMA == "false" ]]; then
+ # Start a single TaskManager
+ "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
+ else
+ # Example output from `numactl --show` on an AWS c4.8xlarge:
+ # policy: default
+ # preferred node: current
+ # physcpubind: 0 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
+ # cpubind: 0 1
+ # nodebind: 0 1
+ # membind: 0 1
+ read -ra NODE_LIST <<< $(numactl --show | grep "^nodebind: ")
+ for NODE_ID in "${NODE_LIST[@]:1}"; do
+ # Start a TaskManager for each NUMA node
+ numactl --membind=$NODE_ID --cpunodebind=$NODE_ID -- "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${ARGS[@]}"
+ done
+ fi
+fi
diff --git a/2.2.0-java8/flink-distribution/conf/flink-conf.yaml b/2.2.0-java8/flink-distribution/conf/flink-conf.yaml
new file mode 100644
index 0000000..8b9b4e4
--- /dev/null
+++ b/2.2.0-java8/flink-distribution/conf/flink-conf.yaml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# This file is the base for the Apache Flink configuration
+
+statefun.flink-job-name: Statefun Application
+
+#==============================================================================
+# Configurations strictly required by Stateful Functions. Do not change.
+#==============================================================================
+
+classloader.parent-first-patterns.additional: org.apache.flink.statefun;org.apache.kafka;com.google.protobuf
+
+#==============================================================================
+# Fault tolerance, checkpointing and recovery.
+# For more related configuration options, please see: https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html#fault-tolerance
+#==============================================================================
+
+# Uncomment the below to enable checkpointing for your application
+#execution.checkpointing.mode: EXACTLY_ONCE
+#execution.checkpointing.interval: 5sec
+
+restart-strategy: fixed-delay
+restart-strategy.fixed-delay.attempts: 2147483647
+restart-strategy.fixed-delay.delay: 1sec
+
+state.backend: rocksdb
+state.backend.rocksdb.timer-service.factory: ROCKSDB
+state.checkpoints.dir: file:///checkpoint-dir
+state.backend.incremental: true
+
+#==============================================================================
+# Recommended memory configurations. Users may change according to their needs.
+#==============================================================================
+
+jobmanager.memory.process.size: 1g
+taskmanager.memory.process.size: 4g