Merge pull request #11627 from robertwb/import-fix-155451253

Fix thread local to be initialized on every thread.
diff --git a/release/src/main/scripts/build_release_candidate.sh b/release/src/main/scripts/build_release_candidate.sh
index 9842fc5..be5a71d 100755
--- a/release/src/main/scripts/build_release_candidate.sh
+++ b/release/src/main/scripts/build_release_candidate.sh
@@ -176,6 +176,7 @@
   cd sdks/python
   virtualenv ${LOCAL_PYTHON_VIRTUALENV}
   source ${LOCAL_PYTHON_VIRTUALENV}/bin/activate
+  pip install -r build-requirements.txt
   python setup.py sdist --format=zip
   cd dist
 
@@ -275,7 +276,7 @@
   git clone ${GIT_REPO_URL}
   cd ${BEAM_ROOT_DIR}
   git checkout ${RELEASE_BRANCH}
-  cd sdks/python && tox -e docs
+  cd sdks/python && pip install -r build-requirements.txt && tox -e py37-docs
   GENERATED_PYDOC=~/${LOCAL_WEBSITE_UPDATE_DIR}/${LOCAL_PYTHON_DOC}/${BEAM_ROOT_DIR}/sdks/python/target/docs/_build
   rm -rf ${GENERATED_PYDOC}/.doctrees
 
diff --git a/sdks/python/apache_beam/coders/coders.py b/sdks/python/apache_beam/coders/coders.py
index 8031fdd..3c8b9a9 100644
--- a/sdks/python/apache_beam/coders/coders.py
+++ b/sdks/python/apache_beam/coders/coders.py
@@ -590,7 +590,7 @@
 
   For internal use."""
   def __init__(self, key_coder, window_coder):
-    # type: (Coder) -> None
+    # type: (Coder, Coder) -> None
     self._key_coder = key_coder
     self._window_coder = window_coder
 
diff --git a/sdks/python/apache_beam/coders/standard_coders_test.py b/sdks/python/apache_beam/coders/standard_coders_test.py
index 0cc5271..881e57f 100644
--- a/sdks/python/apache_beam/coders/standard_coders_test.py
+++ b/sdks/python/apache_beam/coders/standard_coders_test.py
@@ -60,7 +60,8 @@
   if not os.path.exists(test_yaml):
     raise ValueError('Could not find the test spec: %s' % test_yaml)
   with open(test_yaml, 'rb') as coder_spec:
-    for ix, spec in enumerate(yaml.load_all(coder_spec)):
+    for ix, spec in enumerate(
+        yaml.load_all(coder_spec, Loader=yaml.SafeLoader)):
       spec['index'] = ix
       name = spec.get('name', spec['coder']['urn'].split(':')[-2])
       yield [name, spec]
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 992c635..c13c104 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -29,6 +29,7 @@
 import sys
 import unittest
 import uuid
+import warnings
 
 from hamcrest.library.text import stringmatches
 from nose.plugins.attrib import attr
@@ -50,6 +51,9 @@
 from apache_beam.transforms.window import FixedWindows
 from apache_beam.transforms.window import GlobalWindow
 
+warnings.filterwarnings(
+    'ignore', category=FutureWarning, module='apache_beam.io.fileio_test')
+
 
 def _get_file_reader(readable_file):
   if sys.version_info >= (3, 0):
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
index f7d7f9c..67be83b 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_avro_tools.py
@@ -87,7 +87,7 @@
 
 
 def table_field_to_avro_field(table_field, namespace):
-  # type: (Dict[Text, Any]) -> Dict[Text, Any]
+  # type: (Dict[Text, Any], str) -> Dict[Text, Any]
 
   """Convert a BigQuery field to an avro field.
 
diff --git a/sdks/python/apache_beam/runners/pipeline_context.py b/sdks/python/apache_beam/runners/pipeline_context.py
index 51bb4ad..1434f54 100644
--- a/sdks/python/apache_beam/runners/pipeline_context.py
+++ b/sdks/python/apache_beam/runners/pipeline_context.py
@@ -119,7 +119,7 @@
     return self.get_id_to_proto_map()[id]
 
   def put_proto(self, id, proto, ignore_duplicates=False):
-    # type: (str, message.Message) -> str
+    # type: (str, message.Message, bool) -> str
     if not ignore_duplicates and id in self._id_to_proto:
       raise ValueError("Id '%s' is already taken." % id)
     elif (ignore_duplicates and id in self._id_to_proto and
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
index 1f7dcb5..6334dc3 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner_test.py
@@ -1235,8 +1235,8 @@
     pipeline_options = PipelineOptions(direct_num_workers=2)
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
-    #TODO(BEAM-8444): Fix these tests..
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+    #TODO(BEAM-8444): Fix these tests.
+    p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
   def test_metrics(self):
@@ -1255,8 +1255,8 @@
         direct_num_workers=2, direct_running_mode='multi_threading')
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(), options=pipeline_options)
-    #TODO(BEAM-8444): Fix these tests..
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+    #TODO(BEAM-8444): Fix these tests.
+    p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
   def test_metrics(self):
@@ -1283,7 +1283,8 @@
     p = beam.Pipeline(
         runner=fn_api_runner.FnApiRunner(bundle_repeat=3),
         options=pipeline_options)
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
+    #TODO(BEAM-8444): Fix these tests.
+    p._options.view_as(DebugOptions).experiments.remove('beam_fn_api')
     return p
 
   def test_register_finalizations(self):
diff --git a/sdks/python/apache_beam/testing/util.py b/sdks/python/apache_beam/testing/util.py
index d8090e3..5254ea5 100644
--- a/sdks/python/apache_beam/testing/util.py
+++ b/sdks/python/apache_beam/testing/util.py
@@ -40,7 +40,6 @@
 from apache_beam.transforms.core import WindowInto
 from apache_beam.transforms.ptransform import PTransform
 from apache_beam.transforms.util import CoGroupByKey
-from apache_beam.utils.annotations import experimental
 
 __all__ = [
     'assert_that',
@@ -317,7 +316,6 @@
   actual | AssertThat()  # pylint: disable=expression-not-assigned
 
 
-@experimental()
 def open_shards(glob_pattern, mode='rt', encoding='utf-8'):
   """Returns a composite file of all shards matching the given glob pattern.
 
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index adda3fd..92b88f9 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -1349,7 +1349,7 @@
     return key_coder, window_coder
 
   def to_runner_api_parameter(self, context, **extra_kwargs):
-    # type: (PipelineContext) -> typing.Tuple[str, message.Message]
+    # type: (PipelineContext, Any) -> typing.Tuple[str, message.Message]
     assert isinstance(self, ParDo), \
         "expected instance of ParDo, but got %s" % self.__class__
     picked_pardo_fn_data = pickler.dumps(self._pardo_fn_data())
@@ -2242,11 +2242,10 @@
       reify_output_type = typehints.KV[
           key_type, typehints.WindowedValue[value_type]]  # type: ignore[misc]
       gbk_input_type = (
-          typehints.
-          KV[key_type,
-             typehints.Iterable[
-                 typehints.WindowedValue[  # type: ignore[misc]
-                     value_type]]])
+          typehints.KV[
+              key_type,
+              typehints.Iterable[typehints.WindowedValue[  # type: ignore[misc]
+                  value_type]]])
       gbk_output_type = typehints.KV[key_type, typehints.Iterable[value_type]]
 
       # pylint: disable=bad-continuation
@@ -2565,7 +2564,7 @@
     return super(WindowInto, self).expand(pcoll)
 
   def to_runner_api_parameter(self, context, **extra_kwargs):
-    # type: (PipelineContext) -> typing.Tuple[str, message.Message]
+    # type: (PipelineContext, Any) -> typing.Tuple[str, message.Message]
     return (
         common_urns.primitives.ASSIGN_WINDOWS.urn,
         self.windowing.to_runner_api(context))
diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py
index a71ef09..6061987 100644
--- a/sdks/python/apache_beam/transforms/environments.py
+++ b/sdks/python/apache_beam/transforms/environments.py
@@ -258,7 +258,7 @@
 
   @classmethod
   def from_container_image(cls, container_image, artifacts=()):
-    # type: (str) -> DockerEnvironment
+    # type: (str, Iterable[beam_runner_api_pb2.ArtifactInformation]) -> DockerEnvironment
     return cls(
         container_image=container_image,
         capabilities=python_sdk_capabilities(),
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index a5c685e..40a21af 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -664,7 +664,7 @@
       return register
 
   def to_runner_api(self, context, has_parts=False, **extra_kwargs):
-    # type: (PipelineContext, bool) -> beam_runner_api_pb2.FunctionSpec
+    # type: (PipelineContext, bool, Any) -> beam_runner_api_pb2.FunctionSpec
     from apache_beam.portability.api import beam_runner_api_pb2
     urn, typed_param = self.to_runner_api_parameter(context, **extra_kwargs)
     if urn == python_urns.GENERIC_COMPOSITE_TRANSFORM and not has_parts:
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index 4cfdad1..ee007ff 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -710,13 +710,6 @@
       result = pcoll.apply(beam.Distinct())
       assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
 
-  def test_remove_duplicates(self):
-    with TestPipeline() as pipeline:
-      pcoll = pipeline | 'Start' >> beam.Create(
-          [6, 3, 1, 1, 9, 'pleat', 'pleat', 'kazoo', 'navel'])
-      result = pcoll.apply(beam.RemoveDuplicates())
-      assert_that(result, equal_to([1, 3, 6, 9, 'pleat', 'kazoo', 'navel']))
-
   def test_chained_ptransforms(self):
     with TestPipeline() as pipeline:
       t = (
diff --git a/sdks/python/apache_beam/transforms/timeutil.py b/sdks/python/apache_beam/transforms/timeutil.py
index 02224c7..985a665 100644
--- a/sdks/python/apache_beam/transforms/timeutil.py
+++ b/sdks/python/apache_beam/transforms/timeutil.py
@@ -61,7 +61,8 @@
     return TimeDomain._RUNNER_API_MAPPING[domain]
 
 
-class TimestampCombinerImpl(with_metaclass(ABCMeta, object)):  # type: ignore[misc]
+class TimestampCombinerImpl(with_metaclass(ABCMeta,
+                                           object)):  # type: ignore[misc]
   """Implementation of TimestampCombiner."""
   @abstractmethod
   def assign_output_time(self, window, input_timestamp):
@@ -86,7 +87,8 @@
     return self.combine_all(merging_timestamps)
 
 
-class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)):  # type: ignore[misc]
+class DependsOnlyOnWindow(with_metaclass(ABCMeta, TimestampCombinerImpl)
+                          ):  # type: ignore[misc]
   """TimestampCombinerImpl that only depends on the window."""
   def merge(self, result_window, unused_merging_timestamps):
     # Since we know that the result only depends on the window, we can ignore
diff --git a/sdks/python/apache_beam/transforms/trigger.py b/sdks/python/apache_beam/transforms/trigger.py
index 6ce9ba1..57393d8 100644
--- a/sdks/python/apache_beam/transforms/trigger.py
+++ b/sdks/python/apache_beam/transforms/trigger.py
@@ -617,7 +617,8 @@
     return self.underlying.has_ontime_pane()
 
 
-class _ParallelTriggerFn(with_metaclass(ABCMeta, TriggerFn)):  # type: ignore[misc]
+class _ParallelTriggerFn(with_metaclass(ABCMeta,
+                                        TriggerFn)):  # type: ignore[misc]
   def __init__(self, *triggers):
     self.triggers = triggers
 
diff --git a/sdks/python/apache_beam/transforms/trigger_test.py b/sdks/python/apache_beam/transforms/trigger_test.py
index 202bdbd..82df307 100644
--- a/sdks/python/apache_beam/transforms/trigger_test.py
+++ b/sdks/python/apache_beam/transforms/trigger_test.py
@@ -572,7 +572,8 @@
   # runner does not execute this method directly as a test.
   @classmethod
   def _create_tests(cls, transcript_filename):
-    for spec in yaml.load_all(open(transcript_filename)):
+    for spec in yaml.load_all(open(transcript_filename),
+                              Loader=yaml.SafeLoader):
       cls._create_test(spec)
 
   def _run_log_test(self, spec):
@@ -1005,7 +1006,7 @@
 
     with TestPipeline() as p:
       # TODO(BEAM-8601): Pass this during pipeline construction.
-      p.options.view_as(StandardOptions).streaming = True
+      p._options.view_as(StandardOptions).streaming = True
 
       # We can have at most one test stream per pipeline, so we share it.
       inputs_and_expected = p | read_test_stream
diff --git a/sdks/python/apache_beam/transforms/userstate.py b/sdks/python/apache_beam/transforms/userstate.py
index 56ba8d9..d9ad166 100644
--- a/sdks/python/apache_beam/transforms/userstate.py
+++ b/sdks/python/apache_beam/transforms/userstate.py
@@ -158,7 +158,7 @@
     return '%s(%s)' % (self.__class__.__name__, self.name)
 
   def to_runner_api(self, context, key_coder, window_coder):
-    # type: (PipelineContext) -> beam_runner_api_pb2.TimerFamilySpec
+    # type: (PipelineContext, Coder, Coder) -> beam_runner_api_pb2.TimerFamilySpec
     return beam_runner_api_pb2.TimerFamilySpec(
         time_domain=TimeDomain.to_runner_api(self.time_domain),
         timer_family_coder_id=context.coders.get_id(
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 5602766..e261f93 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -29,7 +29,6 @@
 import re
 import sys
 import time
-import warnings
 from builtins import filter
 from builtins import object
 from builtins import range
@@ -758,9 +757,6 @@
     Arguments:
       batch_size: (required) How many elements should be in a batch
     """
-    warnings.warn(
-        'Use of GroupIntoBatches transform requires State/Timer '
-        'support from the runner')
     self.batch_size = batch_size
 
   def expand(self, pcoll):
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index e586627..870b104 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -29,6 +29,7 @@
 import re
 import time
 import unittest
+import warnings
 from builtins import object
 from builtins import range
 
@@ -61,6 +62,9 @@
 from apache_beam.utils.timestamp import MIN_TIMESTAMP
 from apache_beam.utils.windowed_value import WindowedValue
 
+warnings.filterwarnings(
+    'ignore', category=FutureWarning, module='apache_beam.transform.util_test')
+
 
 class FakeClock(object):
   def __init__(self):
diff --git a/sdks/python/apache_beam/transforms/window.py b/sdks/python/apache_beam/transforms/window.py
index a92b7e0..4889d34 100644
--- a/sdks/python/apache_beam/transforms/window.py
+++ b/sdks/python/apache_beam/transforms/window.py
@@ -119,7 +119,8 @@
       raise ValueError('Invalid TimestampCombiner: %s.' % timestamp_combiner)
 
 
-class WindowFn(with_metaclass(abc.ABCMeta, urns.RunnerApiFn)):  # type: ignore[misc]
+class WindowFn(with_metaclass(abc.ABCMeta,
+                              urns.RunnerApiFn)):  # type: ignore[misc]
   """An abstract windowing function defining a basic assign and merge."""
   class AssignContext(object):
     """Context passed to WindowFn.assign()."""
diff --git a/sdks/python/mypy.ini b/sdks/python/mypy.ini
index 1e4748f..151eebf 100644
--- a/sdks/python/mypy.ini
+++ b/sdks/python/mypy.ini
@@ -58,3 +58,65 @@
 [mypy-apache_beam.typehints.typehints_test_py3]
 # error: Signature of "process" incompatible with supertype "DoFn"  [override]
 ignore_errors = true
+
+
+# TODO(BEAM-7746): Remove the lines below.
+[mypy-apache_beam.coders.coders]
+ignore_errors = true
+
+[mypy-apache_beam.coders.*]
+ignore_errors = true
+
+[mypy-apache_beam.dataframe.*]
+ignore_errors = true
+
+[mypy-apache_beam.io.*]
+ignore_errors = true
+
+[mypy-apache_beam.ml.gcp.*]
+ignore_errors = true
+
+[mypy-apache_beam.pipeline]
+ignore_errors = true
+
+[mypy-apache_beam.pvalue]
+ignore_errors = true
+
+[mypy-apache_beam.runners.common]
+ignore_errors = true
+
+[mypy-apache_beam.runners.dataflow.dataflow_runner]
+ignore_errors = true
+
+[mypy-apache_beam.runners.direct.*]
+ignore_errors = true
+
+[mypy-apache_beam.runners.interactive.*]
+ignore_errors = true
+
+[mypy-apache_beam.runners.pipeline_context]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.artifact_service]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.fn_api_runner.*]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.portable_runner]
+ignore_errors = true
+
+[mypy-apache_beam.runners.portability.stager]
+ignore_errors = true
+
+[mypy-apache_beam.runners.worker.*]
+ignore_errors = true
+
+[mypy-apache_beam.testing.synthetic_pipeline]
+ignore_errors = true
+
+[mypy-apache_beam.transforms.*]
+ignore_errors = true
+
+[mypy-apache_beam.typehints.*]
+ignore_errors = true
diff --git a/sdks/python/tox.ini b/sdks/python/tox.ini
index da689a7..3b66c28 100644
--- a/sdks/python/tox.ini
+++ b/sdks/python/tox.ini
@@ -193,10 +193,9 @@
 # make extras available in case any of these libs are typed
 extras =
   gcp
-# TODO: enable c test failures
 commands =
   mypy --version
-  - python setup.py mypy
+  python setup.py mypy
 
 [testenv:py37-docs]
 extras = test,gcp,docs,interactive
diff --git a/website/src/contribute/release-guide.md b/website/src/contribute/release-guide.md
index 8747989..3c50735 100644
--- a/website/src/contribute/release-guide.md
+++ b/website/src/contribute/release-guide.md
@@ -665,6 +665,7 @@
 
 Build python binaries in release branch in sdks/python dir.
 
+    pip install -r build-requirements.txt
     python setup.py sdist --format=zip
     cd dist
     cp apache-beam-${RELEASE}.zip staging/apache-beam-${RELEASE}-python.zip
@@ -780,7 +781,7 @@
 ```
 Create the Python SDK documentation using sphinx by running a helper script.
 ```
-cd sdks/python && tox -e docs
+cd sdks/python && pip install -r build-requirements.txt && tox -e py37-docs
 ```
 By default the Pydoc is generated in `sdks/python/target/docs/_build`. Let `${PYDOC_ROOT}` be the absolute path to `_build`.