[FLINK-31899] Upgrade Flink version of Flink ML to 1.17.0
This closes #235.
diff --git a/.github/workflows/python-tests.yml b/.github/workflows/python-tests.yml
index 24b74ba..9013069 100644
--- a/.github/workflows/python-tests.yml
+++ b/.github/workflows/python-tests.yml
@@ -31,7 +31,7 @@
strategy:
matrix:
os: ['macOS-latest', 'ubuntu-20.04']
- python-version: ['3.6', '3.7', '3.8']
+ python-version: ['3.7', '3.8']
steps:
- name: Checkout code
uses: actions/checkout@v2
diff --git a/docs/content/docs/try-flink-ml/java/quick-start.md b/docs/content/docs/try-flink-ml/java/quick-start.md
index f92cb71..c683fc0 100644
--- a/docs/content/docs/try-flink-ml/java/quick-start.md
+++ b/docs/content/docs/try-flink-ml/java/quick-start.md
@@ -50,7 +50,7 @@
## Download Flink
-[Download 1.16 or a higher version of
+[Download 1.17 or a higher version of
Flink](https://flink.apache.org/downloads.html), then extract the archive:
```shell
diff --git a/docs/content/docs/try-flink-ml/python/quick-start.md b/docs/content/docs/try-flink-ml/python/quick-start.md
index 4ee84df..cfbbb30 100644
--- a/docs/content/docs/try-flink-ml/python/quick-start.md
+++ b/docs/content/docs/try-flink-ml/python/quick-start.md
@@ -60,10 +60,10 @@
{{< stable >}}
- Java 8
-- Python 3.6, 3.7 or 3.8 {{< /stable >}} {{< unstable >}}
+- Python 3.7 or 3.8 {{< /stable >}} {{< unstable >}}
- Java 8
- Maven 3
-- Python 3.6, 3.7 or 3.8 {{< /unstable >}}
+- Python 3.7 or 3.8 {{< /unstable >}}
{{< stable >}}
@@ -248,7 +248,7 @@
### Download Flink
-[Download 1.16 or a higher version of
+[Download 1.17 or a higher version of
Flink](https://flink.apache.org/downloads.html), then extract the archive:
```shell
diff --git a/flink-ml-benchmark/README.md b/flink-ml-benchmark/README.md
index b46244e..0a98a76 100644
--- a/flink-ml-benchmark/README.md
+++ b/flink-ml-benchmark/README.md
@@ -7,7 +7,7 @@
### Install Flink
-Please make sure Flink 1.16 or higher version has been installed in your local
+Please make sure Flink 1.17 or higher version has been installed in your local
environment. You can refer to the [local
installation](https://nightlies.apache.org/flink/flink-docs-master/docs/try-flink/local_installation/)
instruction on Flink's document website for how to achieve this.
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/BroadcastOutputFactory.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/BroadcastOutputFactory.java
index 29fc3c7..ad49281 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/BroadcastOutputFactory.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/BroadcastOutputFactory.java
@@ -44,6 +44,10 @@
OutputReflectionContext outputReflectionContext = new OutputReflectionContext();
+ if (outputReflectionContext.isCountingOutput(output)) {
+ output = outputReflectionContext.getCountingInternalOutput(output);
+ }
+
List<BroadcastOutput<OUT>> internalOutputs = new ArrayList<>();
if (outputReflectionContext.isBroadcastingOutput(output)) {
Output<StreamRecord<OUT>>[] rawOutputs =
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/OutputReflectionContext.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/OutputReflectionContext.java
index 6ce8210..69a5060 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/OutputReflectionContext.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/broadcast/OutputReflectionContext.java
@@ -22,6 +22,7 @@
import org.apache.flink.iteration.utils.ReflectionUtils;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.api.operators.CountingOutput;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -41,6 +42,7 @@
private final Field recordWriterField;
private final Field recordWriterSerializationDelegateField;
private final Field serializationDelegateSerializerField;
+ private final Field countingOutputField;
public OutputReflectionContext() {
try {
@@ -62,6 +64,8 @@
RecordWriterOutput.class, "serializationDelegate");
this.serializationDelegateSerializerField =
ReflectionUtils.getClassField(SerializationDelegate.class, "serializer");
+ this.countingOutputField =
+ ReflectionUtils.getClassField(CountingOutput.class, "output");
} catch (Exception e) {
throw new RuntimeException("Failed to initialize the OutputReflectionContext", e);
}
@@ -79,20 +83,26 @@
return RecordWriterOutput.class.isAssignableFrom(rawOutput.getClass());
}
+ public boolean isCountingOutput(Output<?> rawOutput) {
+ return CountingOutput.class.isAssignableFrom(rawOutput.getClass());
+ }
+
public <OUT> Output<StreamRecord<OUT>>[] getBroadcastingInternalOutputs(Object output) {
return ReflectionUtils.getFieldValue(output, broadcastingOutputsField);
}
+ public <OUT> Output<StreamRecord<OUT>> getCountingInternalOutput(Object output) {
+ return ReflectionUtils.getFieldValue(output, countingOutputField);
+ }
+
public OutputTag<?> getChainingOutputTag(Object output) {
return ReflectionUtils.getFieldValue(output, chainingOutputTagField);
}
- @SuppressWarnings("unchecked")
public RecordWriter<SerializationDelegate<StreamElement>> getRecordWriter(Object output) {
return ReflectionUtils.getFieldValue(output, recordWriterField);
}
- @SuppressWarnings("unchecked")
public TypeSerializer<StreamElement> getRecordWriterTypeSerializer(Object output) {
SerializationDelegate<StreamElement> serializationDelegate =
ReflectionUtils.getFieldValue(output, recordWriterSerializationDelegateField);
diff --git a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/utils/ReflectionUtils.java b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/utils/ReflectionUtils.java
index 3dd5958..3eab395 100644
--- a/flink-ml-iteration/src/main/java/org/apache/flink/iteration/utils/ReflectionUtils.java
+++ b/flink-ml-iteration/src/main/java/org/apache/flink/iteration/utils/ReflectionUtils.java
@@ -18,8 +18,11 @@
package org.apache.flink.iteration.utils;
+import org.apache.flink.util.Preconditions;
+
import java.lang.reflect.Field;
import java.lang.reflect.Method;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -69,15 +72,59 @@
String methodName,
List<Class<?>> parameterClass,
List<Object> parameters) {
+
+ Method method;
try {
- Method method =
+ method =
declaredClass.getDeclaredMethod(
methodName, parameterClass.toArray(new Class[0]));
method.setAccessible(true);
+ } catch (NoSuchMethodException e1) {
+ try {
+ method = declaredClass.getMethod(methodName, parameterClass.toArray(new Class[0]));
+ } catch (NoSuchMethodException e2) {
+ throw new RuntimeException(
+ "Failed to get method" + methodName + " from " + targetObject, e2);
+ }
+ }
+
+ try {
return (T) method.invoke(targetObject, parameters.toArray());
} catch (Exception e) {
throw new RuntimeException(
- "Failed to get method" + methodName + " from " + targetObject, e);
+ "Failed to invoke method" + methodName + " from " + targetObject, e);
+ }
+ }
+
+ /**
+ * The utility method to call method with the specific name. The method can be a public method
+ * of the class or one inherited from superclasses and superinterfaces.
+ *
+ * <p>Note that this method is added only for bypassing the existing bug in Py4j. It doesn't
+ * validate the classes of parameters so it can only deal with the classes that have only one
+ * method with the specific name.
+ *
+ * <p>TODO: Remove this method after the Py4j bug is fixed.
+ */
+ @SuppressWarnings("unchecked")
+ public static <T> T callMethod(
+ Object targetObject, Class<?> declaredClass, String methodName, Object[] parameters) {
+ List<Method> methods = new ArrayList<>();
+ for (Method m : declaredClass.getMethods()) {
+ if (methodName.equals(m.getName())) {
+ methods.add(m);
+ }
+ }
+ Preconditions.checkState(
+ methods.size() == 1,
+ "Only one method with name %s is permitted to be declared in %s",
+ methodName,
+ declaredClass);
+ try {
+ return (T) methods.get(0).invoke(targetObject, parameters);
+ } catch (Exception e) {
+ throw new RuntimeException(
+ "Failed to invoke method" + methodName + " from " + targetObject, e);
}
}
}
diff --git a/flink-ml-python/README.md b/flink-ml-python/README.md
index 1f3f947..31038ff 100644
--- a/flink-ml-python/README.md
+++ b/flink-ml-python/README.md
@@ -7,7 +7,7 @@
Prerequisites for building apache-flink-ml:
* Unix-like environment (we use Linux, Mac OS X)
-* Python version(3.6, 3.7 or 3.8) is required
+* Python version(3.7 or 3.8) is required
Then go to the root directory of flink-ml-python source code and run this command to build the sdist package of apache-flink-ml:
```bash
diff --git a/flink-ml-python/dev/dev-requirements.txt b/flink-ml-python/dev/dev-requirements.txt
index b8ac56c..ee1f703 100755
--- a/flink-ml-python/dev/dev-requirements.txt
+++ b/flink-ml-python/dev/dev-requirements.txt
@@ -14,13 +14,11 @@
# limitations under the License.
setuptools>=18.0
wheel
-apache-flink==1.16.1
-pandas>=1.3.0,<1.4.0; python_version >= '3.7'
-pandas>=1.0,<1.2.0; python_version < '3.7'
+apache-flink==1.17.0
+pandas>=1.3.0,<1.4.0
jsonpickle==2.0.0
-cloudpickle==2.1.0
+cloudpickle==2.2.0
pytest==4.4.1
flake8==4.0.1
mypy==0.910
-numpy>=1.21.4,<1.22.0; python_version >= '3.7'
-numpy>=1.14.3,<1.20; python_version < '3.7'
\ No newline at end of file
+numpy>=1.21.4,<1.22.0
\ No newline at end of file
diff --git a/flink-ml-python/pyflink/ml/wrapper.py b/flink-ml-python/pyflink/ml/wrapper.py
index 77a056f..708e5a5 100644
--- a/flink-ml-python/pyflink/ml/wrapper.py
+++ b/flink-ml-python/pyflink/ml/wrapper.py
@@ -114,7 +114,14 @@
converter = default_converter
java_param_name = snake_to_camel(param.name)
set_method_name = ''.join(['set', java_param_name[0].upper(), java_param_name[1:]])
- getattr(self._java_obj, set_method_name)(converter.to_java(value))
+
+ gateway = get_gateway()
+ gateway.jvm.org.apache.flink.iteration.utils.ReflectionUtils.callMethod(
+ self._java_obj,
+ self._java_obj.getClass(),
+ set_method_name,
+ to_jarray(gateway.jvm.Object, [converter.to_java(value)])
+ )
return self
def get(self, param: Param):
@@ -124,7 +131,14 @@
converter = default_converter
java_param_name = snake_to_camel(param.name)
get_method_name = ''.join(['get', java_param_name[0].upper(), java_param_name[1:]])
- return converter.to_python(getattr(self._java_obj, get_method_name)())
+
+ gateway = get_gateway()
+ result = gateway.jvm.org.apache.flink.iteration.utils.ReflectionUtils.callMethod(
+ self._java_obj,
+ self._java_obj.getClass(),
+ get_method_name
+ )
+ return converter.to_python(result)
def get_param_map(self) -> Dict[Param, Any]:
return self._java_obj.getParamMap()
diff --git a/flink-ml-python/setup.py b/flink-ml-python/setup.py
index b263f5e..234ebfa 100644
--- a/flink-ml-python/setup.py
+++ b/flink-ml-python/setup.py
@@ -23,8 +23,8 @@
from setuptools import setup
-if sys.version_info < (3, 6) or sys.version_info >= (3, 9):
- print("Only Python versions between 3.6 and 3.8 (inclusive) are supported for Flink ML. "
+if sys.version_info < (3, 7) or sys.version_info >= (3, 9):
+ print("Only Python versions between 3.7 and 3.8 (inclusive) are supported for Flink ML. "
"The current Python version is %s." % python_version(), file=sys.stderr)
sys.exit(-1)
@@ -118,12 +118,9 @@
license='https://www.apache.org/licenses/LICENSE-2.0',
author='Apache Software Foundation',
author_email='dev@flink.apache.org',
- python_requires='>=3.6',
- install_requires=['apache-flink==1.16.1', 'jsonpickle==2.0.0', 'cloudpickle==2.1.0',
- 'pandas>=1.0,<1.2.0; python_full_version < "3.7"',
- 'pandas>=1.3.0,<1.4.0; python_full_version >= "3.7"',
- 'numpy>=1.14.3,<1.20; python_full_version < "3.7"',
- 'numpy>=1.21.4,<1.22.0; python_full_version >= "3.7"'],
+ python_requires='>=3.7',
+ install_requires=['apache-flink==1.17.0', 'jsonpickle==2.0.0', 'cloudpickle==2.2.0',
+ 'pandas>=1.3.0,<1.4.0', 'numpy>=1.21.4,<1.22.0'],
tests_require=['pytest==4.4.1'],
description='Apache Flink ML Python API',
long_description=long_description,
@@ -131,7 +128,6 @@
classifiers=[
'Development Status :: 5 - Production/Stable',
'License :: OSI Approved :: Apache Software License',
- 'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8'],
)
diff --git a/pom.xml b/pom.xml
index 9bb29be..fa5c391 100644
--- a/pom.xml
+++ b/pom.xml
@@ -77,7 +77,7 @@
<junit.version>4.13.2</junit.version>
<flink.forkCount>1C</flink.forkCount>
<flink.reuseForks>true</flink.reuseForks>
- <flink.version>1.16.1</flink.version>
+ <flink.version>1.17.0</flink.version>
<zookeeper.version>3.6.3</zookeeper.version>
<hadoop.version>2.10.1</hadoop.version>