[FLINK-18880][python] Respect configurations defined in flink-conf.yaml and environment variables when executing in local mode

This closes #17216.
diff --git a/flink-python/pyflink/pyflink_gateway_server.py b/flink-python/pyflink/pyflink_gateway_server.py
index b137748..a05bbe1 100644
--- a/flink-python/pyflink/pyflink_gateway_server.py
+++ b/flink-python/pyflink/pyflink_gateway_server.py
@@ -30,27 +30,41 @@
 
 from pyflink.find_flink_home import _find_flink_home, _find_flink_source_root
 
+KEY_ENV_LOG_DIR = "env.log.dir"
+KEY_ENV_YARN_CONF_DIR = "env.yarn.conf.dir"
+KEY_ENV_HADOOP_CONF_DIR = "env.hadoop.conf.dir"
+KEY_ENV_HBASE_CONF_DIR = "env.hbase.conf.dir"
+KEY_ENV_JAVA_HOME = "env.java.home"
+KEY_ENV_JAVA_OPTS = "env.java.opts"
+
 
 def on_windows():
     return platform.system() == "Windows"
 
 
-def find_java_executable():
-    java_executable = "java.exe" if on_windows() else "java"
-    flink_home = _find_flink_home()
-    flink_conf_path = os.path.join(flink_home, "conf", "flink-conf.yaml")
-    java_home = None
-
+def read_from_config(key, default_value, flink_conf_file):
+    value = default_value
     # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI
     # using the tainted value and might allow an attacker to access, modify, or test the existence
     # of critical or sensitive files.
-    real_flink_conf_path = os.path.realpath(flink_conf_path)
-    if os.path.isfile(real_flink_conf_path):
-        with open(real_flink_conf_path, "r") as f:
-            flink_conf_yaml = f.read()
-        java_homes = re.findall(r'^[ ]*env\.java\.home[ ]*: ([^#]*).*$', flink_conf_yaml)
-        if len(java_homes) > 1:
-            java_home = java_homes[len(java_homes) - 1].strip()
+    with open(os.path.realpath(flink_conf_file), "r") as f:
+        while True:
+            line = f.readline()
+            if not line:
+                break
+            if line.startswith("#") or len(line.strip()) == 0:
+                continue
+            k, v = line.split(":", 1)
+            if k.strip() == key:
+                value = v.strip()
+    return value
+
+
+def find_java_executable():
+    java_executable = "java.exe" if on_windows() else "java"
+    flink_home = _find_flink_home()
+    flink_conf_file = os.path.join(flink_home, "conf", "flink-conf.yaml")
+    java_home = read_from_config(KEY_ENV_JAVA_HOME, None, flink_conf_file)
 
     if java_home is None and "JAVA_HOME" in os.environ:
         java_home = os.environ["JAVA_HOME"]
@@ -61,7 +75,41 @@
     return java_executable
 
 
-def construct_log_settings():
+def prepare_environment_variables(env):
+    flink_home = _find_flink_home()
+    # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI
+    # using the tainted value and might allow an attacker to access, modify, or test the existence
+    # of critical or sensitive files.
+    real_flink_home = os.path.realpath(flink_home)
+
+    if 'FLINK_CONF_DIR' in env:
+        flink_conf_directory = os.path.realpath(env['FLINK_CONF_DIR'])
+    else:
+        flink_conf_directory = os.path.join(real_flink_home, "conf")
+    env['FLINK_CONF_DIR'] = flink_conf_directory
+
+    if 'FLINK_LIB_DIR' in env:
+        flink_lib_directory = os.path.realpath(env['FLINK_LIB_DIR'])
+    else:
+        flink_lib_directory = os.path.join(real_flink_home, "lib")
+    env['FLINK_LIB_DIR'] = flink_lib_directory
+
+    if 'FLINK_OPT_DIR' in env:
+        flink_opt_directory = os.path.realpath(env['FLINK_OPT_DIR'])
+    else:
+        flink_opt_directory = os.path.join(real_flink_home, "opt")
+    env['FLINK_OPT_DIR'] = flink_opt_directory
+
+    if 'FLINK_PLUGINS_DIR' in env:
+        flink_plugins_directory = os.path.realpath(env['FLINK_PLUGINS_DIR'])
+    else:
+        flink_plugins_directory = os.path.join(real_flink_home, "plugins")
+    env['FLINK_PLUGINS_DIR'] = flink_plugins_directory
+
+    env["FLINK_BIN_DIR"] = os.path.join(real_flink_home, "bin")
+
+
+def construct_log_settings(env):
     templates = [
         "-Dlog.file=${flink_log_dir}/flink-${flink_ident_string}-python-${hostname}.log",
         "-Dlog4j.configuration=${log4j_properties}",
@@ -69,25 +117,28 @@
         "-Dlogback.configurationFile=${logback_xml}"
     ]
 
-    flink_home = _find_flink_home()
-    flink_conf_dir = os.path.join(flink_home, "conf")
-    if "FLINK_LOG_DIR" in os.environ:
-        flink_log_dir = os.environ["FLINK_LOG_DIR"]
-    else:
-        flink_log_dir = os.path.join(flink_home, "log")
+    flink_home = os.path.realpath(_find_flink_home())
+    flink_conf_dir = env['FLINK_CONF_DIR']
+    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
 
-    if "LOG4J_PROPERTIES" in os.environ:
-        log4j_properties = os.environ["LOG4J_PROPERTIES"]
+    if "FLINK_LOG_DIR" in env:
+        flink_log_dir = env["FLINK_LOG_DIR"]
+    else:
+        flink_log_dir = read_from_config(
+            KEY_ENV_LOG_DIR, os.path.join(flink_home, "log"), flink_conf_file)
+
+    if "LOG4J_PROPERTIES" in env:
+        log4j_properties = env["LOG4J_PROPERTIES"]
     else:
         log4j_properties = "%s/log4j-cli.properties" % flink_conf_dir
 
-    if "LOGBACK_XML" in os.environ:
-        logback_xml = os.environ["LOGBACK_XML"]
+    if "LOGBACK_XML" in env:
+        logback_xml = env["LOGBACK_XML"]
     else:
         logback_xml = "%s/logback.xml" % flink_conf_dir
 
-    if "FLINK_IDENT_STRING" in os.environ:
-        flink_ident_string = os.environ["FLINK_IDENT_STRING"]
+    if "FLINK_IDENT_STRING" in env:
+        flink_ident_string = env["FLINK_IDENT_STRING"]
     else:
         flink_ident_string = getpass.getuser()
 
@@ -103,20 +154,21 @@
     return log_settings
 
 
-def construct_classpath():
+def get_jvm_opts(env):
+    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+    jvm_opts = env.get(
+        'FLINK_ENV_JAVA_OPTS', read_from_config(KEY_ENV_JAVA_OPTS, "", flink_conf_file))
+
+    # Remove leading and ending double quotes (if present) of value
+    jvm_opts = jvm_opts.strip("\"")
+    return jvm_opts
+
+
+def construct_flink_classpath(env):
     flink_home = _find_flink_home()
-    # get the realpath of tainted path value to avoid CWE22 problem that constructs a path or URI
-    # using the tainted value and might allow an attacker to access, modify, or test the existence
-    # of critical or sensitive files.
-    real_flink_home = os.path.realpath(flink_home)
-    if 'FLINK_LIB_DIR' in os.environ:
-        flink_lib_directory = os.path.realpath(os.environ['FLINK_LIB_DIR'])
-    else:
-        flink_lib_directory = os.path.join(real_flink_home, "lib")
-    if 'FLINK_OPT_DIR' in os.environ:
-        flink_opt_directory = os.path.realpath(os.environ['FLINK_OPT_DIR'])
-    else:
-        flink_opt_directory = os.path.join(real_flink_home, "opt")
+    flink_lib_directory = env['FLINK_LIB_DIR']
+    flink_opt_directory = env['FLINK_OPT_DIR']
+
     if on_windows():
         # The command length is limited on Windows. To avoid the problem we should shorten the
         # command length as much as possible.
@@ -134,6 +186,32 @@
     return os.pathsep.join([lib_jars, flink_python_jar])
 
 
+def construct_hadoop_classpath(env):
+    flink_conf_file = os.path.join(env['FLINK_CONF_DIR'], "flink-conf.yaml")
+
+    hadoop_conf_dir = ""
+    if 'HADOOP_CONF_DIR' not in env and 'HADOOP_CLASSPATH' not in env:
+        if os.path.isdir("/etc/hadoop/conf"):
+            print("Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or"
+                  "HADOOP_CLASSPATH was set.")
+            hadoop_conf_dir = "/etc/hadoop/conf"
+
+    hbase_conf_dir = ""
+    if 'HBASE_CONF_DIR' not in env:
+        if os.path.isdir("/etc/hbase/conf"):
+            print("Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.")
+            hbase_conf_dir = "/etc/hbase/conf"
+
+    return os.pathsep.join(
+        [env.get("HADOOP_CLASSPATH", ""),
+         env.get("YARN_CONF_DIR",
+                 read_from_config(KEY_ENV_YARN_CONF_DIR, "", flink_conf_file)),
+         env.get("HADOOP_CONF_DIR",
+                 read_from_config(KEY_ENV_HADOOP_CONF_DIR, hadoop_conf_dir, flink_conf_file)),
+         env.get("HBASE_CONF_DIR",
+                 read_from_config(KEY_ENV_HBASE_CONF_DIR, hbase_conf_dir, flink_conf_file))])
+
+
 def download_apache_avro():
     """
     Currently we need to download the Apache Avro manually to avoid test failure caused by the avro
@@ -199,32 +277,21 @@
         main_class, cluster_type, other_args)
 
 
-def prepare_environment_variable(env):
-    flink_home = _find_flink_home()
-    env = dict(env)
-    env["FLINK_CONF_DIR"] = os.path.join(flink_home, "conf")
-    env["FLINK_BIN_DIR"] = os.path.join(flink_home, "bin")
-    if "FLINK_PLUGINS_DIR" not in env:
-        env["FLINK_PLUGINS_DIR"] = os.path.join(flink_home, "plugins")
-    if "FLINK_LIB_DIR" not in env:
-        env["FLINK_LIB_DIR"] = os.path.join(flink_home, "lib")
-    if "FLINK_OPT_DIR" not in env:
-        env["FLINK_OPT_DIR"] = os.path.join(flink_home, "opt")
-    return env
-
-
 def launch_gateway_server_process(env, args):
-    java_executable = find_java_executable()
-    log_settings = construct_log_settings()
-    classpath = construct_classpath()
-    env = prepare_environment_variable(env)
-    if "FLINK_TESTING" in env:
-        download_apache_avro()
-        classpath = os.pathsep.join([classpath, construct_test_classpath()])
+    prepare_environment_variables(env)
     program_args = construct_program_args(args)
     if program_args.cluster_type == "local":
-        command = [java_executable] + log_settings + ["-cp", classpath, program_args.main_class] \
-            + program_args.other_args
+        java_executable = find_java_executable()
+        log_settings = construct_log_settings(env)
+        jvm_args = env.get('JVM_ARGS', '')
+        jvm_opts = get_jvm_opts(env)
+        classpath = os.pathsep.join(
+            [construct_flink_classpath(env), construct_hadoop_classpath(env)])
+        if "FLINK_TESTING" in env:
+            download_apache_avro()
+            classpath = os.pathsep.join([classpath, construct_test_classpath()])
+        command = [java_executable, jvm_args, jvm_opts] + log_settings \
+            + ["-cp", classpath, program_args.main_class] + program_args.other_args
     else:
         command = [os.path.join(env["FLINK_BIN_DIR"], "flink"), "run"] + program_args.other_args \
             + ["-c", program_args.main_class]
@@ -234,7 +301,8 @@
             # ignore ctrl-c / SIGINT
             signal.signal(signal.SIGINT, signal.SIG_IGN)
         preexec_fn = preexec_func
-    return Popen(command, stdin=PIPE, preexec_fn=preexec_fn, env=env)
+    return Popen(list(filter(lambda c: len(c) != 0, command)),
+                 stdin=PIPE, preexec_fn=preexec_fn, env=env)
 
 
 if __name__ == "__main__":