[FLINK-24243][python] Cleanup code to use latest API to avoid warnings
This closes #17237.
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 283cfa3..50a3b6d 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -442,7 +442,7 @@
from pyflink.table.expressions import col
add_three = udf(plus_three, result_type=DataTypes.BIGINT())
- tab = t_env.from_data_stream(ds, 'a') \
+ tab = t_env.from_data_stream(ds, col('a')) \
.select(add_three(col('a')))
result = [i[0] for i in tab.execute().collect()]
expected = [6, 7, 8, 9, 10]
diff --git a/flink-python/pyflink/fn_execution/state_impl.py b/flink-python/pyflink/fn_execution/state_impl.py
index d64dbeb..a424776 100644
--- a/flink-python/pyflink/fn_execution/state_impl.py
+++ b/flink-python/pyflink/fn_execution/state_impl.py
@@ -674,7 +674,7 @@
return state_key.SerializeToString()
-class RemovableConcatIterator(collections.Iterator):
+class RemovableConcatIterator(collections.abc.Iterator):
def __init__(self, internal_map_state, first, second):
self._first = first
diff --git a/flink-python/pyflink/fn_execution/utils/operation_utils.py b/flink-python/pyflink/fn_execution/utils/operation_utils.py
index ef8661c..f779cfd 100644
--- a/flink-python/pyflink/fn_execution/utils/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/utils/operation_utils.py
@@ -16,7 +16,8 @@
# limitations under the License.
################################################################################
import datetime
-from collections import Generator
+from collections.abc import Generator
+
from functools import partial
from typing import Any, Tuple, Dict, List
diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py
index a8c8693..925188a 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -234,7 +234,7 @@
self.assertEqual('currentTimestamp()', str(current_timestamp()))
self.assertEqual('localTime()', str(local_time()))
self.assertEqual('localTimestamp()', str(local_timestamp()))
- self.assertEquals('toTimestampLtz(123, 0)', str(to_timestamp_ltz(123, 0)))
+ self.assertEqual('toTimestampLtz(123, 0)', str(to_timestamp_ltz(123, 0)))
self.assertEqual("temporalOverlaps(cast('2:55:00', TIME(0)), 3600000, "
"cast('3:30:00', TIME(0)), 7200000)",
str(temporal_overlaps(
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 6ece7b8..e4b6d60 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -18,7 +18,7 @@
import datetime
import decimal
-from pandas.util.testing import assert_frame_equal
+from pandas.testing import assert_frame_equal
from pyflink.common import Row
from pyflink.table.types import DataTypes
diff --git a/flink-python/pyflink/table/tests/test_pandas_udf.py b/flink-python/pyflink/table/tests/test_pandas_udf.py
index 1d838df..415b23d 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udf.py
@@ -284,11 +284,11 @@
def test_invalid_pandas_udf(self):
- @udf(result_type=DataTypes.INT(), udf_type="pandas")
+ @udf(result_type=DataTypes.INT(), func_type="pandas")
def length_mismatch(i):
return i[1:]
- @udf(result_type=DataTypes.INT(), udf_type="pandas")
+ @udf(result_type=DataTypes.INT(), func_type="pandas")
def result_type_not_series(i):
return i.iloc[0]
diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py
index 16d8931..f7d9b4e 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -323,7 +323,7 @@
if input_types is not None:
from pyflink.table.types import RowType
- if not isinstance(input_types, collections.Iterable) \
+ if not isinstance(input_types, collections.abc.Iterable) \
or isinstance(input_types, RowType):
input_types = [input_types]
@@ -444,7 +444,7 @@
func, input_types, "general", deterministic, name)
from pyflink.table.types import RowType
- if not isinstance(result_types, collections.Iterable) \
+ if not isinstance(result_types, collections.abc.Iterable) \
or isinstance(result_types, RowType):
result_types = [result_types]