Remove a bunch of spurious warnings in tests. (#11621)
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 0cc5271..881e57f 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -60,7 +60,8 @@
if not os.path.exists(test_yaml):
raise ValueError('Could not find the test spec: %s' % test_yaml)
with open(test_yaml, 'rb') as coder_spec:
- for ix, spec in enumerate(yaml.load_all(coder_spec)):
+ for ix, spec in enumerate(
+ yaml.load_all(coder_spec, Loader=yaml.SafeLoader)):
spec['index'] = ix
name = spec.get('name', spec['coder']['urn'].split(':')[-2])
yield [name, spec]
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 992c635..c13c104 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -29,6 +29,7 @@
import sys
import unittest
import uuid
+import warnings
from hamcrest.library.text import stringmatches
from nose.plugins.attrib import attr
@@ -50,6 +51,9 @@
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
+warnings.filterwarnings(
+ 'ignore', category=FutureWarning, module='apache_beam.io.fileio_test')
+
def _get_file_reader(readable_file):
if sys.version_info >= (3, 0):
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 1f7dcb5..6334dc3 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -1235,8 +1235,8 @@
pipeline_options = PipelineOptions(direct_num_workers=2)
p = beam.Pipeline(
runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
- #TODO(BEAM-8444): Fix these tests..
- p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+ #TODO(BEAM-8444): Fix these tests.
+ p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
return p
def test_metrics(self):
@@ -1255,8 +1255,8 @@
direct_num_workers=2, direct_running_mode='multi_threading')
p = beam.Pipeline(
runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
- #TODO(BEAM-8444): Fix these tests..
- p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+ #TODO(BEAM-8444): Fix these tests.
+ p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
return p
def test_metrics(self):
@@ -1283,7 +1283,8 @@
p = beam.Pipeline(
runner=fn_api_runner.FnApiRunner(bundle_repeat=3),
options=pipeline_options)
- p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+ #TODO(BEAM-8444): Fix these tests.
+ p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
return p
def test_register_finalizations(self):
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index d8090e3..5254ea5 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -40,7 +40,6 @@
from apache_beam.transforms.core import WindowInto
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.util import CoGroupByKey
-from apache_beam.utils.annotations import experimental
__all__ = [
'assert_that',
@@ -317,7 +316,6 @@
actual | AssertThat() # pylint: disable=expression-not-assigned
-@experimental()
def open_shards(glob_pattern, mode='rt', encoding='utf-8'):
"""Returns a composite file of all shards matching the given glob pattern.
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 931f7f0..92b88f9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -2242,11 +2242,10 @@
reify_output_type = typehints.KV[
key_type, typehints.WindowedValue[value_type]] # type: ignore[misc]
gbk_input_type = (
- typehints.
- KV[key_type,
- typehints.Iterable[
- typehints.WindowedValue[ # type: ignore[misc]
- value_type]]])
+ typehints.KV[
+ key_type,
+ typehints.Iterable[typehints.WindowedValue[ # type: ignore[misc]
+ value_type]]])
gbk_output_type = typehints.KV[key_type, typehints.Iterable[value_type]]
# pylint: disable=bad-continuation
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 4cfdad1..ee007ff 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -710,13 +710,6 @@
result = pcoll.apply(beam.Distinct())
assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
- def test_remove_duplicates(self):
- with TestPipeline() as pipeline:
- pcoll = pipeline | 'Start' >> beam.Create(
- [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
- result = pcoll.apply(beam.RemoveDuplicates())
- assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
-
def test_chained_ptransforms(self):
with TestPipeline() as pipeline:
t = (
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 02224c7..985a665 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -61,7 +61,8 @@
return TimeDomain._RUNNER_API_MAPPING[domain]
-class TimestampCombinerImpl(with_metaclass(ABCMeta, object)): # type: ignore[misc]
+class TimestampCombinerImpl(with_metaclass(ABCMeta,
+ object)): # type: ignore[misc]
"""Implementation of TimestampCombiner."""
@abstractmethod
def assign_output_time(self, window, input_timestamp):
@@ -86,7 +87,8 @@
return self.combine_all(merging_timestamps)
-class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)): # type: ignore[misc]
+class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)
+ ): # type: ignore[misc]
"""TimestampCombinerImpl that only depends on the window."""
def merge(self, result_window, unused_merging_timestamps):
# Since we know that the result only depends on the window, we can ignore
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 6ce9ba1..57393d8 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -617,7 +617,8 @@
return self.underlying.has_ontime_pane()
-class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)): # type: ignore[misc]
+class _ParallelTriggerFn(with_metaclass(ABCMeta,
+ TriggerFn)): # type: ignore[misc]
def __init__(self, *triggers):
self.triggers = triggers
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 202bdbd..82df307 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -572,7 +572,8 @@
# runner does not execute this method directly as a test.
@classmethod
def _create_tests(cls, transcript_filename):
- for spec in yaml.load_all(open(transcript_filename)):
+ for spec in yaml.load_all(open(transcript_filename),
+ Loader=yaml.SafeLoader):
cls._create_test(spec)
def _run_log_test(self, spec):
@@ -1005,7 +1006,7 @@
with TestPipeline() as p:
# TODO(BEAM-8601): Pass this during pipeline construction.
- p.options.view_as(StandardOptions).streaming = True
+ p._options.view_as(StandardOptions).streaming = True
# We can have at most one test stream per pipeline, so we share it.
inputs_and_expected = p | read_test_stream
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 5602766..e261f93 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -29,7 +29,6 @@
import re
import sys
import time
-import warnings
from builtins import filter
from builtins import object
from builtins import range
@@ -758,9 +757,6 @@
Arguments:
batch_size: (required) How many elements should be in a batch
"""
- warnings.warn(
- 'Use of GroupIntoBatches transform requires State/Timer '
- 'support from the runner')
self.batch_size = batch_size
def expand(self, pcoll):
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index e586627..870b104 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -29,6 +29,7 @@
import re
import time
import unittest
+import warnings
from builtins import object
from builtins import range
@@ -61,6 +62,9 @@
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.windowed_value import WindowedValue
+warnings.filterwarnings(
+ 'ignore', category=FutureWarning, module='apache_beam.transform.util_test')
+
class FakeClock(object):
def __init__(self):
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index a92b7e0..4889d34 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -119,7 +119,8 @@
raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner)
-class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)): # type: ignore[misc]
+class WindowFn(with_metaclass(abc.ABCMeta,
+ urns.RunnerApiFn)): # type: ignore[misc]
"""An abstract windowing function defining a basic assign and merge."""
class AssignContext(object):
"""Context passed to WindowFn.assign()."""