Remove a bunch of spurious warnings in tests. (#11621)

diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 0cc5271..881e57f 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -60,7 +60,8 @@
   if not os.path.exists(test_yaml):
     raise ValueError('Could not find the test spec: %s' % test_yaml)
   with open(test_yaml, 'rb') as coder_spec:
-    for ix, spec in enumerate(yaml.load_all(coder_spec)):
+    for ix, spec in enumerate(
+        yaml.load_all(coder_spec, Loader=yaml.SafeLoader)):
       spec['index'] = ix
       name = spec.get('name', spec['coder']['urn'].split(':')[-2])
       yield [name, spec]
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 992c635..c13c104 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -29,6 +29,7 @@
 import sys
 import unittest
 import uuid
+import warnings
 
 from hamcrest.library.text import stringmatches
 from nose.plugins.attrib import attr
@@ -50,6 +51,9 @@
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
 
+warnings.filterwarnings(
+    'ignore', category=FutureWarning, module='apache_beam.io.fileio_test')
+
 
 def _get_file_reader(readable_file):
   if sys.version_info >= (3, 0):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 1f7dcb5..6334dc3 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -1235,8 +1235,8 @@
     pipeline_options = PipelineOptions(direct_num_workers=2)
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
-    #TODO(BEAM-8444): Fix these tests..
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+    #TODO(BEAM-8444): Fix these tests.
+    p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
   def test_metrics(self):
@@ -1255,8 +1255,8 @@
         direct_num_workers=2, direct_running_mode='multi_threading')
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
-    #TODO(BEAM-8444): Fix these tests..
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+    #TODO(BEAM-8444): Fix these tests.
+    p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
   def test_metrics(self):
@@ -1283,7 +1283,8 @@
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(bundle_repeat=3),
         options=pipeline_options)
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+    #TODO(BEAM-8444): Fix these tests.
+    p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
   def test_register_finalizations(self):
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index d8090e3..5254ea5 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -40,7 +40,6 @@
 from apache_beam.transforms.core import WindowInto
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import CoGroupByKey
-from apache_beam.utils.annotations import experimental
 
 __all__ = [
     'assert_that',
@@ -317,7 +316,6 @@
   actual | AssertThat()  # pylint: disable=expression-not-assigned
 
 
-@experimental()
 def open_shards(glob_pattern, mode='rt', encoding='utf-8'):
   """Returns a composite file of all shards matching the given glob pattern.
 
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 931f7f0..92b88f9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2242,11 +2242,10 @@
       reify_output_type = typehints.KV[
           key_type, typehints.WindowedValue[value_type]]  # type: ignore[misc]
       gbk_input_type = (
-          typehints.
-          KV[key_type,
-             typehints.Iterable[
-                 typehints.WindowedValue[  # type: ignore[misc]
-                     value_type]]])
+          typehints.KV[
+              key_type,
+              typehints.Iterable[typehints.WindowedValue[  # type: ignore[misc]
+                  value_type]]])
       gbk_output_type = typehints.KV[key_type, typehints.Iterable[value_type]]
 
       # pylint: disable=bad-continuation
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 4cfdad1..ee007ff 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -710,13 +710,6 @@
       result = pcoll.apply(beam.Distinct())
       assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
 
-  def test_remove_duplicates(self):
-    with TestPipeline() as pipeline:
-      pcoll = pipeline | 'Start' >> beam.Create(
-          [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
-      result = pcoll.apply(beam.RemoveDuplicates())
-      assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
-
   def test_chained_ptransforms(self):
     with TestPipeline() as pipeline:
       t = (
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 02224c7..985a665 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -61,7 +61,8 @@
     return TimeDomain._RUNNER_API_MAPPING[domain]
 
 
-class TimestampCombinerImpl(with_metaclass(ABCMeta, object)):  # type: ignore[misc]
+class TimestampCombinerImpl(with_metaclass(ABCMeta,
+                                           object)):  # type: ignore[misc]
   """Implementation of TimestampCombiner."""
   @abstractmethod
   def assign_output_time(self, window, input_timestamp):
@@ -86,7 +87,8 @@
     return self.combine_all(merging_timestamps)
 
 
-class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)):  # type: ignore[misc]
+class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)
+                          ):  # type: ignore[misc]
   """TimestampCombinerImpl that only depends on the window."""
   def merge(self, result_window, unused_merging_timestamps):
     # Since we know that the result only depends on the window, we can ignore
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 6ce9ba1..57393d8 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -617,7 +617,8 @@
     return self.underlying.has_ontime_pane()
 
 
-class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)):  # type: ignore[misc]
+class _ParallelTriggerFn(with_metaclass(ABCMeta,
+                                        TriggerFn)):  # type: ignore[misc]
   def __init__(self, *triggers):
     self.triggers = triggers
 
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 202bdbd..82df307 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -572,7 +572,8 @@
   # runner does not execute this method directly as a test.
   @classmethod
   def _create_tests(cls, transcript_filename):
-    for spec in yaml.load_all(open(transcript_filename)):
+    for spec in yaml.load_all(open(transcript_filename),
+                              Loader=yaml.SafeLoader):
       cls._create_test(spec)
 
   def _run_log_test(self, spec):
@@ -1005,7 +1006,7 @@
 
     with TestPipeline() as p:
       # TODO(BEAM-8601): Pass this during pipeline construction.
-      p.options.view_as(StandardOptions).streaming = True
+      p._options.view_as(StandardOptions).streaming = True
 
       # We can have at most one test stream per pipeline, so we share it.
       inputs_and_expected = p | read_test_stream
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 5602766..e261f93 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -29,7 +29,6 @@
 import re
 import sys
 import time
-import warnings
 from builtins import filter
 from builtins import object
 from builtins import range
@@ -758,9 +757,6 @@
     Arguments:
       batch_size: (required) How many elements should be in a batch
     """
-    warnings.warn(
-        'Use of GroupIntoBatches transform requires State/Timer '
-        'support from the runner')
     self.batch_size = batch_size
 
   def expand(self, pcoll):
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index e586627..870b104 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -29,6 +29,7 @@
 import re
 import time
 import unittest
+import warnings
 from builtins import object
 from builtins import range
 
@@ -61,6 +62,9 @@
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils.windowed_value import WindowedValue
 
+warnings.filterwarnings(
+    'ignore', category=FutureWarning, module='apache_beam.transform.util_test')
+
 
 class FakeClock(object):
   def __init__(self):
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index a92b7e0..4889d34 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -119,7 +119,8 @@
       raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner)
 
 
-class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)):  # type: ignore[misc]
+class WindowFn(with_metaclass(abc.ABCMeta,
+                              urns.RunnerApiFn)):  # type: ignore[misc]
   """An abstract windowing function defining a basic assign and merge."""
   class AssignContext(object):
     """Context passed to WindowFn.assign()."""