[FLINK-24243][python] Cleanup code to use latest API to avoid warnings

This closes #17237.
diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
index 283cfa3..50a3b6d 100644
--- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment.py
@@ -442,7 +442,7 @@
         from pyflink.table.expressions import col
         add_three = udf(plus_three, result_type=DataTypes.BIGINT())
 
-        tab = t_env.from_data_stream(ds, 'a') \
+        tab = t_env.from_data_stream(ds, col('a')) \
                    .select(add_three(col('a')))
         result = [i[0] for i in tab.execute().collect()]
         expected = [6, 7, 8, 9, 10]
diff --git a/flink-python/pyflink/fn_execution/state_impl.py b/flink-python/pyflink/fn_execution/state_impl.py
index d64dbeb..a424776 100644
--- a/flink-python/pyflink/fn_execution/state_impl.py
+++ b/flink-python/pyflink/fn_execution/state_impl.py
@@ -674,7 +674,7 @@
         return state_key.SerializeToString()
 
 
-class RemovableConcatIterator(collections.Iterator):
+class RemovableConcatIterator(collections.abc.Iterator):
 
     def __init__(self, internal_map_state, first, second):
         self._first = first
diff --git a/flink-python/pyflink/fn_execution/utils/operation_utils.py b/flink-python/pyflink/fn_execution/utils/operation_utils.py
index ef8661c..f779cfd 100644
--- a/flink-python/pyflink/fn_execution/utils/operation_utils.py
+++ b/flink-python/pyflink/fn_execution/utils/operation_utils.py
@@ -16,7 +16,8 @@
 # limitations under the License.
 ################################################################################
 import datetime
-from collections import Generator
+from collections.abc import Generator
+
 from functools import partial
 
 from typing import Any, Tuple, Dict, List
diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py
index a8c8693..925188a 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -234,7 +234,7 @@
         self.assertEqual('currentTimestamp()', str(current_timestamp()))
         self.assertEqual('localTime()', str(local_time()))
         self.assertEqual('localTimestamp()', str(local_timestamp()))
-        self.assertEquals('toTimestampLtz(123, 0)', str(to_timestamp_ltz(123, 0)))
+        self.assertEqual('toTimestampLtz(123, 0)', str(to_timestamp_ltz(123, 0)))
         self.assertEqual("temporalOverlaps(cast('2:55:00', TIME(0)), 3600000, "
                          "cast('3:30:00', TIME(0)), 7200000)",
                          str(temporal_overlaps(
diff --git a/flink-python/pyflink/table/tests/test_pandas_conversion.py b/flink-python/pyflink/table/tests/test_pandas_conversion.py
index 6ece7b8..e4b6d60 100644
--- a/flink-python/pyflink/table/tests/test_pandas_conversion.py
+++ b/flink-python/pyflink/table/tests/test_pandas_conversion.py
@@ -18,7 +18,7 @@
 import datetime
 import decimal
 
-from pandas.util.testing import assert_frame_equal
+from pandas.testing import assert_frame_equal
 
 from pyflink.common import Row
 from pyflink.table.types import DataTypes
diff --git a/flink-python/pyflink/table/tests/test_pandas_udf.py b/flink-python/pyflink/table/tests/test_pandas_udf.py
index 1d838df..415b23d 100644
--- a/flink-python/pyflink/table/tests/test_pandas_udf.py
+++ b/flink-python/pyflink/table/tests/test_pandas_udf.py
@@ -284,11 +284,11 @@
 
     def test_invalid_pandas_udf(self):
 
-        @udf(result_type=DataTypes.INT(), udf_type="pandas")
+        @udf(result_type=DataTypes.INT(), func_type="pandas")
         def length_mismatch(i):
             return i[1:]
 
-        @udf(result_type=DataTypes.INT(), udf_type="pandas")
+        @udf(result_type=DataTypes.INT(), func_type="pandas")
         def result_type_not_series(i):
             return i.iloc[0]
 
diff --git a/flink-python/pyflink/table/udf.py b/flink-python/pyflink/table/udf.py
index 16d8931..f7d9b4e 100644
--- a/flink-python/pyflink/table/udf.py
+++ b/flink-python/pyflink/table/udf.py
@@ -323,7 +323,7 @@
 
         if input_types is not None:
             from pyflink.table.types import RowType
-            if not isinstance(input_types, collections.Iterable) \
+            if not isinstance(input_types, collections.abc.Iterable) \
                     or isinstance(input_types, RowType):
                 input_types = [input_types]
 
@@ -444,7 +444,7 @@
             func, input_types, "general", deterministic, name)
 
         from pyflink.table.types import RowType
-        if not isinstance(result_types, collections.Iterable) \
+        if not isinstance(result_types, collections.abc.Iterable) \
                 or isinstance(result_types, RowType):
             result_types = [result_types]