[FLINK-18880][python] Respect configurations defined in flink-conf.yaml and environment variables when executing in local mode
This closes #17216.
diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py
index b137748..a05bbe1 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -30,27 +30,41 @@
from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
+KEY_ENV_LOG_DIR = "env.log.dir"
+KEY_ENV_YARN_CONF_DIR = "env.yarn.conf.dir"
+KEY_ENV_HADOOP_CONF_DIR = "env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR = "env.hbase.conf.dir"
+KEY_ENV_JAVA_HOME = "env.java.home"
+KEY_ENV_JAVA_OPTS = "env.java.opts"
+
def on_windows():
return platform.system() == "Windows"
-def find_java_executable():
- java_executable = "java.exe" if on_windows() else "java"
- flink_home = _find_flink_home()
- flink_conf_path = os.path.join(flink_home, "conf", "flink-conf.yaml")
- java_home = None
-
+def read_from_config(key, default_value, flink_conf_file):
+ value = default_value
# get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI
# using the tainted value and might allow an attacker to access, modify, or test the existence
# of critical or sensitive files.
- real_flink_conf_path = os.path.realpath(flink_conf_path)
- if os.path.isfile(real_flink_conf_path):
- with open(real_flink_conf_path, "r") as f:
- flink_conf_yaml = f.read()
- java_homes = re.findall(r'^[ ]*env\.java\.home[ ]*: ([^#]*).*$', flink_conf_yaml)
- if len(java_homes) > 1:
- java_home = java_homes[len(java_homes) - 1].strip()
+ with open(os.path.realpath(flink_conf_file), "r") as f:
+ while True:
+ line = f.readline()
+ if not line:
+ break
+ if line.startswith("#") or len(line.strip()) == 0:
+ continue
+ k, v = line.split(":", 1)
+ if k.strip() == key:
+ value = v.strip()
+ return value
+
+
+def find_java_executable():
+ java_executable = "java.exe" if on_windows() else "java"
+ flink_home = _find_flink_home()
+ flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml")
+ java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file)
if java_home is None and "JAVA_HOME" in os.environ:
java_home = os.environ["JAVA_HOME"]
@@ -61,7 +75,41 @@
return java_executable
-def construct_log_settings():
+def prepare_environment_variables(env):
+ flink_home = _find_flink_home()
+ # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI
+ # using the tainted value and might allow an attacker to access, modify, or test the existence
+ # of critical or sensitive files.
+ real_flink_home = os.path.realpath(flink_home)
+
+ if 'FLINK_CONF_DIR' in env:
+ flink_conf_directory = os.path.realpath(env['FLINK_CONF_DIR'])
+ else:
+ flink_conf_directory = os.path.join(real_flink_home, "conf")
+ env['FLINK_CONF_DIR'] = flink_conf_directory
+
+ if 'FLINK_LIB_DIR' in env:
+ flink_lib_directory = os.path.realpath(env['FLINK_LIB_DIR'])
+ else:
+ flink_lib_directory = os.path.join(real_flink_home, "lib")
+ env['FLINK_LIB_DIR'] = flink_lib_directory
+
+ if 'FLINK_OPT_DIR' in env:
+ flink_opt_directory = os.path.realpath(env['FLINK_OPT_DIR'])
+ else:
+ flink_opt_directory = os.path.join(real_flink_home, "opt")
+ env['FLINK_OPT_DIR'] = flink_opt_directory
+
+ if 'FLINK_PLUGINS_DIR' in env:
+ flink_plugins_directory = os.path.realpath(env['FLINK_PLUGINS_DIR'])
+ else:
+ flink_plugins_directory = os.path.join(real_flink_home, "plugins")
+ env['FLINK_PLUGINS_DIR'] = flink_plugins_directory
+
+ env["FLINK_BIN_DIR"] = os.path.join(real_flink_home, "bin")
+
+
+def construct_log_settings(env):
templates = [
"-Dlog.file=${flink_log_dir}/flink-${flink_ident_string}-python-${hostname}.log",
"-Dlog4j.configuration=${log4j_properties}",
@@ -69,25 +117,28 @@
"-Dlogback.configurationFile=${logback_xml}"
]
- flink_home = _find_flink_home()
- flink_conf_dir = os.path.join(flink_home, "conf")
- if "FLINK_LOG_DIR" in os.environ:
- flink_log_dir = os.environ["FLINK_LOG_DIR"]
- else:
- flink_log_dir = os.path.join(flink_home, "log")
+ flink_home = os.path.realpath(_find_flink_home())
+ flink_conf_dir = env['FLINK_CONF_DIR']
+ flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
- if "LOG4J_PROPERTIES" in os.environ:
- log4j_properties = os.environ["LOG4J_PROPERTIES"]
+ if "FLINK_LOG_DIR" in env:
+ flink_log_dir = env["FLINK_LOG_DIR"]
+ else:
+ flink_log_dir = read_from_config(
+ KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file)
+
+ if "LOG4J_PROPERTIES" in env:
+ log4j_properties = env["LOG4J_PROPERTIES"]
else:
log4j_properties = "%s/log4j-cli.properties" % flink_conf_dir
- if "LOGBACK_XML" in os.environ:
- logback_xml = os.environ["LOGBACK_XML"]
+ if "LOGBACK_XML" in env:
+ logback_xml = env["LOGBACK_XML"]
else:
logback_xml = "%s/logback.xml" % flink_conf_dir
- if "FLINK_IDENT_STRING" in os.environ:
- flink_ident_string = os.environ["FLINK_IDENT_STRING"]
+ if "FLINK_IDENT_STRING" in env:
+ flink_ident_string = env["FLINK_IDENT_STRING"]
else:
flink_ident_string = getpass.getuser()
@@ -103,20 +154,21 @@
return log_settings
-def construct_classpath():
+def get_jvm_opts(env):
+ flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+ jvm_opts = env.get(
+ 'FLINK_ENV_JAVA_OPTS', read_from_config(KEY_ENV_JAVA_OPTS, "", flink_conf_file))
+
+ # Remove leading and ending double quotes (if present) of value
+ jvm_opts = jvm_opts.strip("\"")
+ return jvm_opts
+
+
+def construct_flink_classpath(env):
flink_home = _find_flink_home()
- # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI
- # using the tainted value and might allow an attacker to access, modify, or test the existence
- # of critical or sensitive files.
- real_flink_home = os.path.realpath(flink_home)
- if 'FLINK_LIB_DIR' in os.environ:
- flink_lib_directory = os.path.realpath(os.environ['FLINK_LIB_DIR'])
- else:
- flink_lib_directory = os.path.join(real_flink_home, "lib")
- if 'FLINK_OPT_DIR' in os.environ:
- flink_opt_directory = os.path.realpath(os.environ['FLINK_OPT_DIR'])
- else:
- flink_opt_directory = os.path.join(real_flink_home, "opt")
+ flink_lib_directory = env['FLINK_LIB_DIR']
+ flink_opt_directory = env['FLINK_OPT_DIR']
+
if on_windows():
# The command length is limited on Windows. To avoid the problem we should shorten the
# command length as much as possible.
@@ -134,6 +186,32 @@
return os.pathsep.join([lib_jars, flink_python_jar])
+def construct_hadoop_classpath(env):
+ flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+
+ hadoop_conf_dir = ""
+ if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env:
+ if os.path.isdir("/etc/hadoop/conf"):
+ print("Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or"
+ "HADOOP_CLASSPATH was set.")
+ hadoop_conf_dir = "/etc/hadoop/conf"
+
+ hbase_conf_dir = ""
+ if 'HBASE_CONF_DIR' not in env:
+ if os.path.isdir("/etc/hbase/conf"):
+ print("Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.")
+ hbase_conf_dir = "/etc/hbase/conf"
+
+ return os.pathsep.join(
+ [env.get("HADOOP_CLASSPATH", ""),
+ env.get("YARN_CONF_DIR",
+ read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)),
+ env.get("HADOOP_CONF_DIR",
+ read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, flink_conf_file)),
+ env.get("HBASE_CONF_DIR",
+ read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, flink_conf_file))])
+
+
def download_apache_avro():
"""
Currently we need to download the Apache Avro manually to avoid test failure caused by the avro
@@ -199,32 +277,21 @@
main_class, cluster_type, other_args)
-def prepare_environment_variable(env):
- flink_home = _find_flink_home()
- env = dict(env)
- env["FLINK_CONF_DIR"] = os.path.join(flink_home, "conf")
- env["FLINK_BIN_DIR"] = os.path.join(flink_home, "bin")
- if "FLINK_PLUGINS_DIR" not in env:
- env["FLINK_PLUGINS_DIR"] = os.path.join(flink_home, "plugins")
- if "FLINK_LIB_DIR" not in env:
- env["FLINK_LIB_DIR"] = os.path.join(flink_home, "lib")
- if "FLINK_OPT_DIR" not in env:
- env["FLINK_OPT_DIR"] = os.path.join(flink_home, "opt")
- return env
-
-
def launch_gateway_server_process(env, args):
- java_executable = find_java_executable()
- log_settings = construct_log_settings()
- classpath = construct_classpath()
- env = prepare_environment_variable(env)
- if "FLINK_TESTING" in env:
- download_apache_avro()
- classpath = os.pathsep.join([classpath, construct_test_classpath()])
+ prepare_environment_variables(env)
program_args = construct_program_args(args)
if program_args.cluster_type == "local":
- command = [java_executable] + log_settings + ["-cp", classpath, program_args.main_class] \
- + program_args.other_args
+ java_executable = find_java_executable()
+ log_settings = construct_log_settings(env)
+ jvm_args = env.get('JVM_ARGS', '')
+ jvm_opts = get_jvm_opts(env)
+ classpath = os.pathsep.join(
+ [construct_flink_classpath(env), construct_hadoop_classpath(env)])
+ if "FLINK_TESTING" in env:
+ download_apache_avro()
+ classpath = os.pathsep.join([classpath, construct_test_classpath()])
+ command = [java_executable, jvm_args, jvm_opts] + log_settings \
+ + ["-cp", classpath, program_args.main_class] + program_args.other_args
else:
command = [os.path.join(env["FLINK_BIN_DIR"], "flink"), "run"] + program_args.other_args \
+ ["-c", program_args.main_class]
@@ -234,7 +301,8 @@
# ignore ctrl-c / SIGINT
signal.signal(signal.SIGINT, signal.SIG_IGN)
preexec_fn = preexec_func
- return Popen(command, stdin=PIPE, preexec_fn=preexec_fn, env=env)
+ return Popen(list(filter(lambda c: len(c) != 0, command)),
+ stdin=PIPE, preexec_fn=preexec_fn, env=env)
if __name__ == "__main__":