[FLINK-31899] Upgrade Flink version of Flink ML to 1.17.0

This closes #235.
diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml
index 24b74ba..9013069 100644
--- a/.github/workflows/python-tests.yml
+++ b/.github/workflows/python-tests.yml
@@ -31,7 +31,7 @@
     strategy:
       matrix:
         os: ['macOS-latest', 'ubuntu-20.04']
-        python-version: ['3.6', '3.7', '3.8']
+        python-version: ['3.7', '3.8']
     steps:
       - name: Checkout code
         uses: actions/checkout@v2
diff --git a/docs/content/docs/try-flink-ml/java/quick-start.md b/docs/content/docs/try-flink-ml/java/quick-start.md
index f92cb71..c683fc0 100644
--- a/docs/content/docs/try-flink-ml/java/quick-start.md
+++ b/docs/content/docs/try-flink-ml/java/quick-start.md
@@ -50,7 +50,7 @@
 
 ## Download Flink
 
-[Download 1.16 or a higher version of
+[Download 1.17 or a higher version of
 Flink](https://flink.apache.org/downloads.html), then extract the archive:
 
 ```shell
diff --git a/docs/content/docs/try-flink-ml/python/quick-start.md b/docs/content/docs/try-flink-ml/python/quick-start.md
index 4ee84df..cfbbb30 100644
--- a/docs/content/docs/try-flink-ml/python/quick-start.md
+++ b/docs/content/docs/try-flink-ml/python/quick-start.md
@@ -60,10 +60,10 @@
 
 {{< stable >}}
 - Java 8
-- Python 3.6, 3.7 or 3.8 {{< /stable >}} {{< unstable >}}
+- Python 3.7 or 3.8 {{< /stable >}} {{< unstable >}}
 - Java 8
 - Maven 3
-- Python 3.6, 3.7 or 3.8 {{< /unstable >}}
+- Python 3.7 or 3.8 {{< /unstable >}}
 
 {{< stable >}}
 
@@ -248,7 +248,7 @@
 
 ### Download Flink
 
-[Download 1.16 or a higher version of
+[Download 1.17 or a higher version of
 Flink](https://flink.apache.org/downloads.html), then extract the archive:
 
 ```shell
diff --git a/flink-ml-benchmark/README.md b/flink-ml-benchmark/README.md
index b46244e..0a98a76 100644
--- a/flink-ml-benchmark/README.md
+++ b/flink-ml-benchmark/README.md
@@ -7,7 +7,7 @@
 
 ### Install Flink
 
-Please make sure Flink 1.16 or higher version has been installed in your local
+Please make sure Flink 1.17 or higher version has been installed in your local
 environment. You can refer to the [local
 installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)
 instruction on Flink's document website for how to achieve this.
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/BroadcastOutputFactory.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/BroadcastOutputFactory.java
index 29fc3c7..ad49281 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/BroadcastOutputFactory.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/BroadcastOutputFactory.java
@@ -44,6 +44,10 @@
 
         OutputReflectionContext outputReflectionContext = new OutputReflectionContext();
 
+        if (outputReflectionContext.isCountingOutput(output)) {
+            output = outputReflectionContext.getCountingInternalOutput(output);
+        }
+
         List<BroadcastOutput<OUT>> internalOutputs = new ArrayList<>();
         if (outputReflectionContext.isBroadcastingOutput(output)) {
             Output<StreamRecord<OUT>>[] rawOutputs =
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/OutputReflectionContext.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/OutputReflectionContext.java
index 6ce8210..69a5060 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/OutputReflectionContext.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/OutputReflectionContext.java
@@ -22,6 +22,7 @@
 import org.apache.flink.iteration.utils.ReflectionUtils;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.operators.CountingOutput;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -41,6 +42,7 @@
     private final Field recordWriterField;
     private final Field recordWriterSerializationDelegateField;
     private final Field serializationDelegateSerializerField;
+    private final Field countingOutputField;
 
     public OutputReflectionContext() {
         try {
@@ -62,6 +64,8 @@
                             RecordWriterOutput.class, "serializationDelegate");
             this.serializationDelegateSerializerField =
                     ReflectionUtils.getClassField(SerializationDelegate.class, "serializer");
+            this.countingOutputField =
+                    ReflectionUtils.getClassField(CountingOutput.class, "output");
         } catch (Exception e) {
             throw new RuntimeException("Failed to initialize the OutputReflectionContext", e);
         }
@@ -79,20 +83,26 @@
         return RecordWriterOutput.class.isAssignableFrom(rawOutput.getClass());
     }
 
+    public boolean isCountingOutput(Output<?> rawOutput) {
+        return CountingOutput.class.isAssignableFrom(rawOutput.getClass());
+    }
+
     public <OUT> Output<StreamRecord<OUT>>[] getBroadcastingInternalOutputs(Object output) {
         return ReflectionUtils.getFieldValue(output, broadcastingOutputsField);
     }
 
+    public <OUT> Output<StreamRecord<OUT>> getCountingInternalOutput(Object output) {
+        return ReflectionUtils.getFieldValue(output, countingOutputField);
+    }
+
     public OutputTag<?> getChainingOutputTag(Object output) {
         return ReflectionUtils.getFieldValue(output, chainingOutputTagField);
     }
 
-    @SuppressWarnings("unchecked")
     public RecordWriter<SerializationDelegate<StreamElement>> getRecordWriter(Object output) {
         return ReflectionUtils.getFieldValue(output, recordWriterField);
     }
 
-    @SuppressWarnings("unchecked")
     public TypeSerializer<StreamElement> getRecordWriterTypeSerializer(Object output) {
         SerializationDelegate<StreamElement> serializationDelegate =
                 ReflectionUtils.getFieldValue(output, recordWriterSerializationDelegateField);
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/utils/ReflectionUtils.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/utils/ReflectionUtils.java
index 3dd5958..3eab395 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/utils/ReflectionUtils.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/utils/ReflectionUtils.java
@@ -18,8 +18,11 @@
 
 package org.apache.flink.iteration.utils;
 
+import org.apache.flink.util.Preconditions;
+
 import java.lang.reflect.Field;
 import java.lang.reflect.Method;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 
@@ -69,15 +72,59 @@
             String methodName,
             List<Class<?>> parameterClass,
             List<Object> parameters) {
+
+        Method method;
         try {
-            Method method =
+            method =
                     declaredClass.getDeclaredMethod(
                             methodName, parameterClass.toArray(new Class[0]));
             method.setAccessible(true);
+        } catch (NoSuchMethodException e1) {
+            try {
+                method = declaredClass.getMethod(methodName, parameterClass.toArray(new Class[0]));
+            } catch (NoSuchMethodException e2) {
+                throw new RuntimeException(
+                        "Failed to get method" + methodName + " from " + targetObject, e2);
+            }
+        }
+
+        try {
             return (T) method.invoke(targetObject, parameters.toArray());
         } catch (Exception e) {
             throw new RuntimeException(
-                    "Failed to get method" + methodName + " from " + targetObject, e);
+                    "Failed to invoke method" + methodName + " from " + targetObject, e);
+        }
+    }
+
+    /**
+     * The utility method to call method with the specific name. The method can be a public method
+     * of the class or one inherited from superclasses and superinterfaces.
+     *
+     * <p>Note that this method is added only for bypassing the existing bug in Py4j. It doesn't
+     * validate the classes of parameters so it can only deal with the classes that have only one
+     * method with the specific name.
+     *
+     * <p>TODO: Remove this method after the Py4j bug is fixed.
+     */
+    @SuppressWarnings("unchecked")
+    public static <T> T callMethod(
+            Object targetObject, Class<?> declaredClass, String methodName, Object[] parameters) {
+        List<Method> methods = new ArrayList<>();
+        for (Method m : declaredClass.getMethods()) {
+            if (methodName.equals(m.getName())) {
+                methods.add(m);
+            }
+        }
+        Preconditions.checkState(
+                methods.size() == 1,
+                "Only one method with name %s is permitted to be declared in %s",
+                methodName,
+                declaredClass);
+        try {
+            return (T) methods.get(0).invoke(targetObject, parameters);
+        } catch (Exception e) {
+            throw new RuntimeException(
+                    "Failed to invoke method" + methodName + " from " + targetObject, e);
         }
     }
 }
diff --git a/flink-ml-python/README.md b/flink-ml-python/README.md
index 1f3f947..31038ff 100644
--- a/flink-ml-python/README.md
+++ b/flink-ml-python/README.md
@@ -7,7 +7,7 @@
 Prerequisites for building apache-flink-ml:
 
 * Unix-like environment (we use Linux, Mac OS X)
-* Python version(3.6, 3.7 or 3.8) is required
+* Python version(3.7 or 3.8) is required
 
 Then go to the root directory of flink-ml-python source code and run this command to build the sdist package of apache-flink-ml:
 ```bash
diff --git a/flink-ml-python/dev/dev-requirements.txt b/flink-ml-python/dev/dev-requirements.txt
index b8ac56c..ee1f703 100755
--- a/flink-ml-python/dev/dev-requirements.txt
+++ b/flink-ml-python/dev/dev-requirements.txt
@@ -14,13 +14,11 @@
 # limitations under the License.
 setuptools>=18.0
 wheel
-apache-flink==1.16.1
-pandas>=1.3.0,<1.4.0; python_version >= '3.7'
-pandas>=1.0,<1.2.0; python_version < '3.7'
+apache-flink==1.17.0
+pandas>=1.3.0,<1.4.0
 jsonpickle==2.0.0
-cloudpickle==2.1.0
+cloudpickle==2.2.0
 pytest==4.4.1
 flake8==4.0.1
 mypy==0.910
-numpy>=1.21.4,<1.22.0; python_version >= '3.7'
-numpy>=1.14.3,<1.20; python_version < '3.7'
\ No newline at end of file
+numpy>=1.21.4,<1.22.0
\ No newline at end of file
diff --git a/flink-ml-python/pyflink/ml/wrapper.py b/flink-ml-python/pyflink/ml/wrapper.py
index 77a056f..708e5a5 100644
--- a/flink-ml-python/pyflink/ml/wrapper.py
+++ b/flink-ml-python/pyflink/ml/wrapper.py
@@ -114,7 +114,14 @@
             converter = default_converter
         java_param_name = snake_to_camel(param.name)
         set_method_name = ''.join(['set', java_param_name[0].upper(), java_param_name[1:]])
-        getattr(self._java_obj, set_method_name)(converter.to_java(value))
+
+        gateway = get_gateway()
+        gateway.jvm.org.apache.flink.iteration.utils.ReflectionUtils.callMethod(
+            self._java_obj,
+            self._java_obj.getClass(),
+            set_method_name,
+            to_jarray(gateway.jvm.Object, [converter.to_java(value)])
+        )
         return self
 
     def get(self, param: Param):
@@ -124,7 +131,14 @@
             converter = default_converter
         java_param_name = snake_to_camel(param.name)
         get_method_name = ''.join(['get', java_param_name[0].upper(), java_param_name[1:]])
-        return converter.to_python(getattr(self._java_obj, get_method_name)())
+
+        gateway = get_gateway()
+        result = gateway.jvm.org.apache.flink.iteration.utils.ReflectionUtils.callMethod(
+            self._java_obj,
+            self._java_obj.getClass(),
+            get_method_name
+        )
+        return converter.to_python(result)
 
     def get_param_map(self) -> Dict[Param, Any]:
         return self._java_obj.getParamMap()
diff --git a/flink-ml-python/setup.py b/flink-ml-python/setup.py
index b263f5e..234ebfa 100644
--- a/flink-ml-python/setup.py
+++ b/flink-ml-python/setup.py
@@ -23,8 +23,8 @@
 
 from setuptools import setup
 
-if sys.version_info < (3, 6) or sys.version_info >= (3, 9):
-    print("Only Python versions between 3.6 and 3.8 (inclusive) are supported for Flink ML. "
+if sys.version_info < (3, 7) or sys.version_info >= (3, 9):
+    print("Only Python versions between 3.7 and 3.8 (inclusive) are supported for Flink ML. "
           "The current Python version is %s." % python_version(), file=sys.stderr)
     sys.exit(-1)
 
@@ -118,12 +118,9 @@
         license='https://www.apache.org/licenses/LICENSE-2.0',
         author='Apache Software Foundation',
         author_email='dev@flink.apache.org',
-        python_requires='>=3.6',
-        install_requires=['apache-flink==1.16.1', 'jsonpickle==2.0.0', 'cloudpickle==2.1.0',
-                          'pandas>=1.0,<1.2.0; python_full_version < "3.7"',
-                          'pandas>=1.3.0,<1.4.0; python_full_version >= "3.7"',
-                          'numpy>=1.14.3,<1.20; python_full_version < "3.7"',
-                          'numpy>=1.21.4,<1.22.0; python_full_version >= "3.7"'],
+        python_requires='>=3.7',
+        install_requires=['apache-flink==1.17.0', 'jsonpickle==2.0.0', 'cloudpickle==2.2.0',
+                          'pandas>=1.3.0,<1.4.0', 'numpy>=1.21.4,<1.22.0'],
         tests_require=['pytest==4.4.1'],
         description='Apache Flink ML Python API',
         long_description=long_description,
@@ -131,7 +128,6 @@
         classifiers=[
             'Development Status :: 5 - Production/Stable',
             'License :: OSI Approved :: Apache Software License',
-            'Programming Language :: Python :: 3.6',
             'Programming Language :: Python :: 3.7',
             'Programming Language :: Python :: 3.8'],
     )
diff --git a/pom.xml b/pom.xml
index 9bb29be..fa5c391 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@
     <junit.version>4.13.2</junit.version>
     <flink.forkCount>1C</flink.forkCount>
     <flink.reuseForks>true</flink.reuseForks>
-    <flink.version>1.16.1</flink.version>
+    <flink.version>1.17.0</flink.version>
     <zookeeper.version>3.6.3</zookeeper.version>
     <hadoop.version>2.10.1</hadoop.version>