Merge pull request #11627 from robertwb/import-fix-155451253
Fix thread local to be initialized on every thread.
diff --git a/release/src/main/scripts/build_release_candidate.sh b/release/src/main/scripts/build_release_candidate.sh
index 9842fc5..be5a71d 100755
--- a/release/src/main/scripts/build_release_candidate.sh
+++ b/release/src/main/scripts/build_release_candidate.sh
@@ -176,6 +176,7 @@
cd sdks/python
virtualenv ${LOCAL_PYTHON_VIRTUALENV}
source ${LOCAL_PYTHON_VIRTUALENV}/bin/activate
+ pip install -r build-requirements.txt
python setup.py sdist --format=zip
cd dist
@@ -275,7 +276,7 @@
git clone ${GIT_REPO_URL}
cd ${BEAM_ROOT_DIR}
git checkout ${RELEASE_BRANCH}
- cd sdks/python && tox -e docs
+ cd sdks/python && pip install -r build-requirements.txt && tox -e py37-docs
GENERATED_PYDOC=~/${LOCAL_WEBSITE_UPDATE_DIR}/${LOCAL_PYTHON_DOC}/${BEAM_ROOT_DIR}/sdks/python/target/docs/_build
rm -rf ${GENERATED_PYDOC}/.doctrees
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 8031fdd..3c8b9a9 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -590,7 +590,7 @@
For internal use."""
def __init__(self, key_coder, window_coder):
- # type: (Coder) -> None
+ # type: (Coder, Coder) -> None
self._key_coder = key_coder
self._window_coder = window_coder
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 0cc5271..881e57f 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -60,7 +60,8 @@
if not os.path.exists(test_yaml):
raise ValueError('Could not find the test spec: %s' % test_yaml)
with open(test_yaml, 'rb') as coder_spec:
- for ix, spec in enumerate(yaml.load_all(coder_spec)):
+ for ix, spec in enumerate(
+ yaml.load_all(coder_spec, Loader=yaml.SafeLoader)):
spec['index'] = ix
name = spec.get('name', spec['coder']['urn'].split(':')[-2])
yield [name, spec]
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 992c635..c13c104 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -29,6 +29,7 @@
import sys
import unittest
import uuid
+import warnings
from hamcrest.library.text import stringmatches
from nose.plugins.attrib import attr
@@ -50,6 +51,9 @@
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.window import GlobalWindow
+warnings.filterwarnings(
+ 'ignore', category=FutureWarning, module='apache_beam.io.fileio_test')
+
def _get_file_reader(readable_file):
if sys.version_info >= (3, 0):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
index f7d7f9c..67be83b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
@@ -87,7 +87,7 @@
def table_field_to_avro_field(table_field, namespace):
- # type: (Dict[Text, Any]) -> Dict[Text, Any]
+ # type: (Dict[Text, Any], str) -> Dict[Text, Any]
"""Convert a BigQuery field to an avro field.
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index 51bb4ad..1434f54 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -119,7 +119,7 @@
return self.get_id_to_proto_map()[id]
def put_proto(self, id, proto, ignore_duplicates=False):
- # type: (str, message.Message) -> str
+ # type: (str, message.Message, bool) -> str
if not ignore_duplicates and id in self._id_to_proto:
raise ValueError("Id '%s' is already taken." % id)
elif (ignore_duplicates and id in self._id_to_proto and
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 1f7dcb5..6334dc3 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -1235,8 +1235,8 @@
pipeline_options = PipelineOptions(direct_num_workers=2)
p = beam.Pipeline(
runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
- #TODO(BEAM-8444): Fix these tests..
- p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+ #TODO(BEAM-8444): Fix these tests.
+ p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
return p
def test_metrics(self):
@@ -1255,8 +1255,8 @@
direct_num_workers=2, direct_running_mode='multi_threading')
p = beam.Pipeline(
runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
- #TODO(BEAM-8444): Fix these tests..
- p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+ #TODO(BEAM-8444): Fix these tests.
+ p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
return p
def test_metrics(self):
@@ -1283,7 +1283,8 @@
p = beam.Pipeline(
runner=fn_api_runner.FnApiRunner(bundle_repeat=3),
options=pipeline_options)
- p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+ #TODO(BEAM-8444): Fix these tests.
+ p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
return p
def test_register_finalizations(self):
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index d8090e3..5254ea5 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -40,7 +40,6 @@
from apache_beam.transforms.core import WindowInto
from apache_beam.transforms.ptransform import PTransform
from apache_beam.transforms.util import CoGroupByKey
-from apache_beam.utils.annotations import experimental
__all__ = [
'assert_that',
@@ -317,7 +316,6 @@
actual | AssertThat() # pylint: disable=expression-not-assigned
-@experimental()
def open_shards(glob_pattern, mode='rt', encoding='utf-8'):
"""Returns a composite file of all shards matching the given glob pattern.
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index adda3fd..92b88f9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1349,7 +1349,7 @@
return key_coder, window_coder
def to_runner_api_parameter(self, context, **extra_kwargs):
- # type: (PipelineContext) -> typing.Tuple[str, message.Message]
+ # type: (PipelineContext, Any) -> typing.Tuple[str, message.Message]
assert isinstance(self, ParDo), \
"expected instance of ParDo, but got %s" % self.__class__
picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data())
@@ -2242,11 +2242,10 @@
reify_output_type = typehints.KV[
key_type, typehints.WindowedValue[value_type]] # type: ignore[misc]
gbk_input_type = (
- typehints.
- KV[key_type,
- typehints.Iterable[
- typehints.WindowedValue[ # type: ignore[misc]
- value_type]]])
+ typehints.KV[
+ key_type,
+ typehints.Iterable[typehints.WindowedValue[ # type: ignore[misc]
+ value_type]]])
gbk_output_type = typehints.KV[key_type, typehints.Iterable[value_type]]
# pylint: disable=bad-continuation
@@ -2565,7 +2564,7 @@
return super(WindowInto, self).expand(pcoll)
def to_runner_api_parameter(self, context, **extra_kwargs):
- # type: (PipelineContext) -> typing.Tuple[str, message.Message]
+ # type: (PipelineContext, Any) -> typing.Tuple[str, message.Message]
return (
common_urns.primitives.ASSIGN_WINDOWS.urn,
self.windowing.to_runner_api(context))
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index a71ef09..6061987 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -258,7 +258,7 @@
@classmethod
def from_container_image(cls, container_image, artifacts=()):
- # type: (str) -> DockerEnvironment
+ # type: (str, Iterable[beam_runner_api_pb2.ArtifactInformation]) -> DockerEnvironment
return cls(
container_image=container_image,
capabilities=python_sdk_capabilities(),
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index a5c685e..40a21af 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -664,7 +664,7 @@
return register
def to_runner_api(self, context, has_parts=False, **extra_kwargs):
- # type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+ # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
from apache_beam.portability.api import beam_runner_api_pb2
urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 4cfdad1..ee007ff 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -710,13 +710,6 @@
result = pcoll.apply(beam.Distinct())
assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
- def test_remove_duplicates(self):
- with TestPipeline() as pipeline:
- pcoll = pipeline | 'Start' >> beam.Create(
- [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
- result = pcoll.apply(beam.RemoveDuplicates())
- assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
-
def test_chained_ptransforms(self):
with TestPipeline() as pipeline:
t = (
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 02224c7..985a665 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -61,7 +61,8 @@
return TimeDomain._RUNNER_API_MAPPING[domain]
-class TimestampCombinerImpl(with_metaclass(ABCMeta, object)): # type: ignore[misc]
+class TimestampCombinerImpl(with_metaclass(ABCMeta,
+ object)): # type: ignore[misc]
"""Implementation of TimestampCombiner."""
@abstractmethod
def assign_output_time(self, window, input_timestamp):
@@ -86,7 +87,8 @@
return self.combine_all(merging_timestamps)
-class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)): # type: ignore[misc]
+class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)
+ ): # type: ignore[misc]
"""TimestampCombinerImpl that only depends on the window."""
def merge(self, result_window, unused_merging_timestamps):
# Since we know that the result only depends on the window, we can ignore
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 6ce9ba1..57393d8 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -617,7 +617,8 @@
return self.underlying.has_ontime_pane()
-class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)): # type: ignore[misc]
+class _ParallelTriggerFn(with_metaclass(ABCMeta,
+ TriggerFn)): # type: ignore[misc]
def __init__(self, *triggers):
self.triggers = triggers
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 202bdbd..82df307 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -572,7 +572,8 @@
# runner does not execute this method directly as a test.
@classmethod
def _create_tests(cls, transcript_filename):
- for spec in yaml.load_all(open(transcript_filename)):
+ for spec in yaml.load_all(open(transcript_filename),
+ Loader=yaml.SafeLoader):
cls._create_test(spec)
def _run_log_test(self, spec):
@@ -1005,7 +1006,7 @@
with TestPipeline() as p:
# TODO(BEAM-8601): Pass this during pipeline construction.
- p.options.view_as(StandardOptions).streaming = True
+ p._options.view_as(StandardOptions).streaming = True
# We can have at most one test stream per pipeline, so we share it.
inputs_and_expected = p | read_test_stream
diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py
index 56ba8d9..d9ad166 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -158,7 +158,7 @@
return '%s(%s)' % (self.__class__.__name__, self.name)
def to_runner_api(self, context, key_coder, window_coder):
- # type: (PipelineContext) -> beam_runner_api_pb2.TimerFamilySpec
+ # type: (PipelineContext, Coder, Coder) -> beam_runner_api_pb2.TimerFamilySpec
return beam_runner_api_pb2.TimerFamilySpec(
time_domain=TimeDomain.to_runner_api(self.time_domain),
timer_family_coder_id=context.coders.get_id(
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 5602766..e261f93 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -29,7 +29,6 @@
import re
import sys
import time
-import warnings
from builtins import filter
from builtins import object
from builtins import range
@@ -758,9 +757,6 @@
Arguments:
batch_size: (required) How many elements should be in a batch
"""
- warnings.warn(
- 'Use of GroupIntoBatches transform requires State/Timer '
- 'support from the runner')
self.batch_size = batch_size
def expand(self, pcoll):
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index e586627..870b104 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -29,6 +29,7 @@
import re
import time
import unittest
+import warnings
from builtins import object
from builtins import range
@@ -61,6 +62,9 @@
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.windowed_value import WindowedValue
+warnings.filterwarnings(
+ 'ignore', category=FutureWarning, module='apache_beam.transform.util_test')
+
class FakeClock(object):
def __init__(self):
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index a92b7e0..4889d34 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -119,7 +119,8 @@
raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner)
-class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)): # type: ignore[misc]
+class WindowFn(with_metaclass(abc.ABCMeta,
+ urns.RunnerApiFn)): # type: ignore[misc]
"""An abstract windowing function defining a basic assign and merge."""
class AssignContext(object):
"""Context passed to WindowFn.assign()."""
diff --git a/sdks/python/mypy.ini b/sdks/python/mypy.ini
index 1e4748f..151eebf 100644
--- a/sdks/python/mypy.ini
+++ b/sdks/python/mypy.ini
@@ -58,3 +58,65 @@
[mypy-apache_beam.typehints.typehints_test_py3]
# error: Signature of "process" incompatible with supertype "DoFn" [override]
ignore_errors = true
+
+
+# TODO(BEAM-7746): Remove the lines below.
+[mypy-apache_beam.coders.coders]
+ignore_errors = true
+
+[mypy-apache_beam.coders.*]
+ignore_errors = true
+
+[mypy-apache_beam.dataframe.*]
+ignore_errors = true
+
+[mypy-apache_beam.io.*]
+ignore_errors = true
+
+[mypy-apache_beam.ml.gcp.*]
+ignore_errors = true
+
+[mypy-apache_beam.pipeline]
+ignore_errors = true
+
+[mypy-apache_beam.pvalue]
+ignore_errors = true
+
+[mypy-apache_beam.runners.common]
+ignore_errors = true
+
+[mypy-apache_beam.runners.dataflow.dataflow_runner]
+ignore_errors = true
+
+[mypy-apache_beam.runners.direct.*]
+ignore_errors = true
+
+[mypy-apache_beam.runners.interactive.*]
+ignore_errors = true
+
+[mypy-apache_beam.runners.pipeline_context]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.artifact_service]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.fn_api_runner.*]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.portable_runner]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.stager]
+ignore_errors = true
+
+[mypy-apache_beam.runners.worker.*]
+ignore_errors = true
+
+[mypy-apache_beam.testing.synthetic_pipeline]
+ignore_errors = true
+
+[mypy-apache_beam.transforms.*]
+ignore_errors = true
+
+[mypy-apache_beam.typehints.*]
+ignore_errors = true
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index da689a7..3b66c28 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -193,10 +193,9 @@
# make extras available in case any of these libs are typed
extras =
gcp
-# TODO: enable c test failures
commands =
mypy --version
- - python setup.py mypy
+ python setup.py mypy
[testenv:py37-docs]
extras = test,gcp,docs,interactive
diff --git a/website/src/contribute/release-guide.md b/website/src/contribute/release-guide.md
index 8747989..3c50735 100644
--- a/website/src/contribute/release-guide.md
+++ b/website/src/contribute/release-guide.md
@@ -665,6 +665,7 @@
Build python binaries in release branch in sdks/python dir.
+ pip install -r build-requirements.txt
python setup.py sdist --format=zip
cd dist
cp apache-beam-${RELEASE}.zip staging/apache-beam-${RELEASE}-python.zip
@@ -780,7 +781,7 @@
```
Create the Python SDK documentation using sphinx by running a helper script.
```
-cd sdks/python && tox -e docs
+cd sdks/python && pip install -r build-requirements.txt && tox -e py37-docs
```
By default the Pydoc is generated in `sdks/python/target/docs/_build`. Let `${PYDOC_ROOT}` be the absolute path to `_build`.