Merge pull request #10283 [BEAM-8882] Allow Dataflow to automatically choose portability.

diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index e149451..fb7ee42 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -49,7 +49,7 @@
     selected = None
     len_corpus = len(corpus)
     while not selected:
-      c = list(corpus[randrange(0, len_corpus - 1)].values())[0]
+      c = list(corpus[randrange(0, len_corpus)].values())[0]
       if c != ignore:
         selected = c
 
@@ -59,7 +59,7 @@
     selected = None
     len_words = len(words)
     while not selected:
-      c = list(words[randrange(0, len_words - 1)].values())[0]
+      c = list(words[randrange(0, len_words)].values())[0]
       if c != ignore:
         selected = c
 
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 11fb95b..031eeb3 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -36,9 +36,9 @@
 
       group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C'])
       corpus_pcoll = p | 'CreateCorpus' >> beam.Create(
-          [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}])
+          [{'f': 'corpus1'}, {'f': 'corpus2'}])
       words_pcoll = p | 'CreateWords' >> beam.Create(
-          [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}])
+          [{'f': 'word1'}, {'f': 'word2'}])
       ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1'])
       ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1'])
 
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 4b52266..f0f53e2 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -528,11 +528,12 @@
   def test_model_pcollection(self):
     temp_path = self.create_temp_file()
     snippets.model_pcollection(['--output=%s' % temp_path])
-    self.assertEqual(self.get_output(temp_path, sorted_output=False), [
+    self.assertEqual(self.get_output(temp_path), [
+        'Or to take arms against a sea of troubles, ',
+        'The slings and arrows of outrageous fortune, ',
         'To be, or not to be: that is the question: ',
         'Whether \'tis nobler in the mind to suffer ',
-        'The slings and arrows of outrageous fortune, ',
-        'Or to take arms against a sea of troubles, '])
+    ])
 
   def test_construct_pipeline(self):
     temp_path = self.create_temp_file(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index bbf8d3a..92e37c6 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -122,7 +122,7 @@
     with TestPipeline() as p:
       output_pcs = (
           p
-          | beam.Create(_DESTINATION_ELEMENT_PAIRS)
+          | beam.Create(_DESTINATION_ELEMENT_PAIRS, reshuffle=False)
           | beam.ParDo(fn, self.tmpdir)
           .with_outputs(fn.WRITTEN_FILE_TAG, fn.UNWRITTEN_RECORD_TAG))
 
@@ -325,7 +325,7 @@
                                   ('destination0', ['file2', 'file3'])]
     single_partition_result = [('destination1', ['file0', 'file1'])]
     with TestPipeline() as p:
-      destination_file_pairs = p | beam.Create(self._ELEMENTS)
+      destination_file_pairs = p | beam.Create(self._ELEMENTS, reshuffle=False)
       partitioned_files = (
           destination_file_pairs
           | beam.ParDo(bqfl.PartitionFiles(1000, 2))
@@ -347,7 +347,7 @@
                                   ('destination0', ['file3'])]
     single_partition_result = [('destination1', ['file0', 'file1'])]
     with TestPipeline() as p:
-      destination_file_pairs = p | beam.Create(self._ELEMENTS)
+      destination_file_pairs = p | beam.Create(self._ELEMENTS, reshuffle=False)
       partitioned_files = (
           destination_file_pairs
           | beam.ParDo(bqfl.PartitionFiles(150, 10))
@@ -533,7 +533,7 @@
 
     with TestPipeline('DirectRunner') as p:
       outputs = (p
-                 | beam.Create(_ELEMENTS)
+                 | beam.Create(_ELEMENTS, reshuffle=False)
                  | bqfl.BigQueryBatchFileLoads(
                      destination,
                      custom_gcs_temp_location=self._new_tempdir(),
@@ -660,7 +660,7 @@
         experiments='use_beam_bq_sink')
 
     with beam.Pipeline(argv=args) as p:
-      input = p | beam.Create(_ELEMENTS)
+      input = p | beam.Create(_ELEMENTS, reshuffle=False)
 
       schema_map_pcv = beam.pvalue.AsDict(
           p | "MakeSchemas" >> beam.Create(schema_kv_pairs))
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e2bd696..0a1b211 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -851,7 +851,6 @@
 
   @staticmethod
   def get_desired_chunk_size(total_size):
-    total_size
     if total_size:
       # 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards
       chunk_size = max(1 << 20, 1000 * int(math.sqrt(total_size)))
@@ -860,31 +859,11 @@
     return chunk_size
 
   def expand(self, pbegin):
-    from apache_beam.options.pipeline_options import DebugOptions
-    from apache_beam.transforms import util
-
-    assert isinstance(pbegin, pvalue.PBegin)
-    self.pipeline = pbegin.pipeline
-
-    debug_options = self.pipeline._options.view_as(DebugOptions)
-    if debug_options.experiments and 'beam_fn_api' in debug_options.experiments:
-      source = self.source
-
-      def split_source(unused_impulse):
-        return source.split(
-            self.get_desired_chunk_size(self.source.estimate_size()))
-
-      return (
-          pbegin
-          | core.Impulse()
-          | 'Split' >> core.FlatMap(split_source)
-          | util.Reshuffle()
-          | 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
-              split.source.get_range_tracker(
-                  split.start_position, split.stop_position))))
+    if isinstance(self.source, BoundedSource):
+      return pbegin | _SDFBoundedSourceWrapper(self.source)
     else:
       # Treat Read itself as a primitive.
-      return pvalue.PCollection(self.pipeline,
+      return pvalue.PCollection(pbegin.pipeline,
                                 is_bounded=self.source.is_bounded())
 
   def get_windowing(self, unused_inputs):
@@ -1534,7 +1513,11 @@
 
   def _create_sdf_bounded_source_dofn(self):
     source = self.source
-    chunk_size = Read.get_desired_chunk_size(source.estimate_size())
+    try:
+      estimated_size = source.estimate_size()
+    except NotImplementedError:
+      estimated_size = None
+    chunk_size = Read.get_desired_chunk_size(estimated_size)
 
     class SDFBoundedSourceDoFn(core.DoFn):
       def __init__(self, read_source):
diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py
index 9772591..a574d20 100644
--- a/sdks/python/apache_beam/io/iobase_test.py
+++ b/sdks/python/apache_beam/io/iobase_test.py
@@ -198,8 +198,6 @@
       experiments = (p._options.view_as(DebugOptions).experiments or [])
 
       # Setup experiment option to enable using SDFBoundedSourceWrapper
-      if 'use_sdf_bounded_source' not in experiments:
-        experiments.append('use_sdf_bounded_source')
       if 'beam_fn_api' not in experiments:
         # Required so mocking below doesn't mock Create used in assert_that.
         experiments.append('beam_fn_api')
diff --git a/sdks/python/apache_beam/io/parquetio_test.py b/sdks/python/apache_beam/io/parquetio_test.py
index 34f8eba..719bf55 100644
--- a/sdks/python/apache_beam/io/parquetio_test.py
+++ b/sdks/python/apache_beam/io/parquetio_test.py
@@ -312,7 +312,7 @@
       path = dst.name
       with TestPipeline() as p:
         _ = p \
-        | Create(self.RECORDS) \
+        | Create(self.RECORDS, reshuffle=False) \
         | WriteToParquet(
             path, self.SCHEMA, num_shards=1, shard_name_template='')
       with TestPipeline() as p:
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index ecfa6fb..ad336c5 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -1113,7 +1113,7 @@
       with open(file_name, 'rb') as f:
         read_result.extend(f.read().splitlines())
 
-    self.assertEqual(read_result, self.lines)
+    self.assertEqual(sorted(read_result), sorted(self.lines))
 
   def test_write_dataflow_auto_compression(self):
     pipeline = TestPipeline()
@@ -1126,7 +1126,7 @@
       with gzip.GzipFile(file_name, 'rb') as f:
         read_result.extend(f.read().splitlines())
 
-    self.assertEqual(read_result, self.lines)
+    self.assertEqual(sorted(read_result), sorted(self.lines))
 
   def test_write_dataflow_auto_compression_unsharded(self):
     pipeline = TestPipeline()
@@ -1142,7 +1142,7 @@
       with gzip.GzipFile(file_name, 'rb') as f:
         read_result.extend(f.read().splitlines())
 
-    self.assertEqual(read_result, self.lines)
+    self.assertEqual(sorted(read_result), sorted(self.lines))
 
   def test_write_dataflow_header(self):
     pipeline = TestPipeline()
@@ -1159,7 +1159,8 @@
       with gzip.GzipFile(file_name, 'rb') as f:
         read_result.extend(f.read().splitlines())
     # header_text is automatically encoded in WriteToText
-    self.assertEqual(read_result, [header_text.encode('utf-8')] + self.lines)
+    self.assertEqual(read_result[0], header_text.encode('utf-8'))
+    self.assertEqual(sorted(read_result[1:]), sorted(self.lines))
 
 
 if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index 1f7ba2a..dfb154a 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -228,7 +228,7 @@
           file_name, options=tf.python_io.TFRecordOptions(
               tf.python_io.TFRecordCompressionType.GZIP)):
         actual.append(r)
-      self.assertEqual(actual, input_data)
+      self.assertEqual(sorted(actual), sorted(input_data))
 
   def test_write_record_auto(self):
     with TempDir() as temp_dir:
@@ -244,7 +244,7 @@
           file_name, options=tf.python_io.TFRecordOptions(
               tf.python_io.TFRecordCompressionType.GZIP)):
         actual.append(r)
-      self.assertEqual(actual, input_data)
+      self.assertEqual(sorted(actual), sorted(input_data))
 
 
 class TestReadFromTFRecord(unittest.TestCase):
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index c1c25d1..b1b2e68 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -253,7 +253,7 @@
 
   def test_visit_entire_graph(self):
     pipeline = Pipeline()
-    pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3])
+    pcoll1 = pipeline | 'pcoll' >> beam.Impulse()
     pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1])
     pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1])
     pcoll4 = pcoll2 | 'do3' >> FlatMap(lambda x: [x + 1])
@@ -266,9 +266,9 @@
                      set(visitor.visited))
     self.assertEqual(set(visitor.enter_composite),
                      set(visitor.leave_composite))
-    self.assertEqual(3, len(visitor.enter_composite))
-    self.assertEqual(visitor.enter_composite[2].transform, transform)
-    self.assertEqual(visitor.leave_composite[1].transform, transform)
+    self.assertEqual(2, len(visitor.enter_composite))
+    self.assertEqual(visitor.enter_composite[1].transform, transform)
+    self.assertEqual(visitor.leave_composite[0].transform, transform)
 
   def test_apply_custom_transform(self):
     pipeline = TestPipeline()
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 718ab61..1fae45d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -101,12 +101,19 @@
   # TODO: Remove the apache_beam.pipeline dependency in CreatePTransformOverride
   from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride
   from apache_beam.runners.dataflow.ptransform_overrides import ReadPTransformOverride
+  from apache_beam.runners.dataflow.ptransform_overrides import JrhReadPTransformOverride
 
   _PTRANSFORM_OVERRIDES = [
-      CreatePTransformOverride(),
   ]
 
-  _SDF_PTRANSFORM_OVERRIDES = [
+  _JRH_PTRANSFORM_OVERRIDES = [
+      JrhReadPTransformOverride(),
+  ]
+
+  # These overrides should be applied after the proto representation of the
+  # graph is created.
+  _NON_PORTABLE_PTRANSFORM_OVERRIDES = [
+      CreatePTransformOverride(),
       ReadPTransformOverride(),
   ]
 
@@ -395,8 +402,10 @@
     # done before Runner API serialization, since the new proto needs to contain
     # any added PTransforms.
     pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
-    if apiclient._use_sdf_bounded_source(options):
-      pipeline.replace_all(DataflowRunner._SDF_PTRANSFORM_OVERRIDES)
+
+    if (apiclient._use_fnapi(options)
+        and not apiclient._use_unified_worker(options)):
+      pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
 
     use_fnapi = apiclient._use_fnapi(options)
     from apache_beam.transforms import environments
@@ -424,6 +433,11 @@
       self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
           return_context=True, default_environment=default_environment)
 
+    else:
+      # Performing configured PTransform overrides which should not be reflected
+      # in the proto representation of the graph.
+      pipeline.replace_all(DataflowRunner._NON_PORTABLE_PTRANSFORM_OVERRIDES)
+
     # Add setup_options for all the BeamPlugin imports
     setup_options = options.view_as(SetupOptions)
     plugins = BeamPlugin.get_all_plugin_paths()
@@ -504,10 +518,10 @@
     result.metric_results = self._metrics
     return result
 
-  def _get_typehint_based_encoding(self, typehint, window_coder, use_fnapi):
+  def _get_typehint_based_encoding(self, typehint, window_coder):
     """Returns an encoding based on a typehint object."""
     return self._get_cloud_encoding(
-        self._get_coder(typehint, window_coder=window_coder), use_fnapi)
+        self._get_coder(typehint, window_coder=window_coder))
 
   @staticmethod
   def _get_coder(typehint, window_coder):
@@ -518,13 +532,12 @@
           window_coder=window_coder)
     return coders.registry.get_coder(typehint)
 
-  def _get_cloud_encoding(self, coder, use_fnapi):
+  def _get_cloud_encoding(self, coder, unused=None):
     """Returns an encoding based on a coder object."""
     if not isinstance(coder, coders.Coder):
       raise TypeError('Coder object must inherit from coders.Coder: %s.' %
                       str(coder))
-    return coder.as_cloud_object(self.proto_context
-                                 .coders if use_fnapi else None)
+    return coder.as_cloud_object(self.proto_context.coders)
 
   def _get_side_input_encoding(self, input_encoding):
     """Returns an encoding for the output of a view transform.
@@ -567,11 +580,7 @@
               output_tag].windowing.windowfn.get_window_coder())
     else:
       window_coder = None
-    from apache_beam.runners.dataflow.internal import apiclient
-    use_fnapi = apiclient._use_fnapi(
-        list(transform_node.outputs.values())[0].pipeline._options)
-    return self._get_typehint_based_encoding(element_type, window_coder,
-                                             use_fnapi)
+    return self._get_typehint_based_encoding(element_type, window_coder)
 
   def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
     """Creates a Step object and adds it to the cache."""
@@ -879,6 +888,8 @@
       serialized_data = pickler.dumps(
           self._pardo_fn_data(transform_node, lookup_label))
     step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
+    # TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
+    # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, transform_id)
     step.add_property(
         PropertyNames.PARALLEL_INPUT,
         {'@type': 'OutputReference',
@@ -935,10 +946,9 @@
     # Add the restriction encoding if we are a splittable DoFn
     # and are using the Fn API on the unified worker.
     restriction_coder = transform.get_restriction_coder()
-    if (use_fnapi and use_unified_worker and restriction_coder):
+    if restriction_coder:
       step.add_property(PropertyNames.RESTRICTION_ENCODING,
-                        self._get_cloud_encoding(
-                            restriction_coder, use_fnapi))
+                        self._get_cloud_encoding(restriction_coder))
 
   @staticmethod
   def _pardo_fn_data(transform_node, get_label):
@@ -958,6 +968,7 @@
     input_step = self._cache.get_pvalue(transform_node.inputs[0])
     step = self._add_step(
         TransformNames.COMBINE, transform_node.full_label, transform_node)
+    transform_id = self.proto_context.transforms.get_id(transform_node.parent)
 
     # The data transmitted in SERIALIZED_FN is different depending on whether
     # this is a fnapi pipeline or not.
@@ -967,8 +978,7 @@
       # Fnapi pipelines send the transform ID of the CombineValues transform's
       # parent composite because Dataflow expects the ID of a CombinePerKey
       # transform.
-      serialized_data = self.proto_context.transforms.get_id(
-          transform_node.parent)
+      serialized_data = transform_id
     else:
       # Combiner functions do not take deferred side-inputs (i.e. PValues) and
       # therefore the code to handle extra args/kwargs is simpler than for the
@@ -977,6 +987,8 @@
       serialized_data = pickler.dumps((transform.fn, transform.args,
                                        transform.kwargs, ()))
     step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
+    # TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
+    # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, transform_id)
     step.add_property(
         PropertyNames.PARALLEL_INPUT,
         {'@type': 'OutputReference',
@@ -985,7 +997,7 @@
     # Note that the accumulator must not have a WindowedValue encoding, while
     # the output of this step does in fact have a WindowedValue encoding.
     accumulator_encoding = self._get_cloud_encoding(
-        transform_node.transform.fn.get_accumulator_coder(), use_fnapi)
+        transform_node.transform.fn.get_accumulator_coder())
     output_encoding = self._get_encoded_output_coder(transform_node)
 
     step.encoding = output_encoding
@@ -1005,16 +1017,7 @@
       # Consider native Read to be a primitive for dataflow.
       return beam.pvalue.PCollection.from_(pbegin)
     else:
-      debug_options = options.view_as(DebugOptions)
-      if (
-          debug_options.experiments and
-          'beam_fn_api' in debug_options.experiments
-      ):
-        # Expand according to FnAPI primitives.
-        return self.apply_PTransform(transform, pbegin, options)
-      else:
-        # Custom Read is also a primitive for non-FnAPI on dataflow.
-        return beam.pvalue.PCollection.from_(pbegin)
+      return self.apply_PTransform(transform, pbegin, options)
 
   def run_Read(self, transform_node, options):
     transform = transform_node.transform
@@ -1125,8 +1128,7 @@
         coders.coders.GlobalWindowCoder())
 
     from apache_beam.runners.dataflow.internal import apiclient
-    use_fnapi = apiclient._use_fnapi(options)
-    step.encoding = self._get_cloud_encoding(coder, use_fnapi)
+    step.encoding = self._get_cloud_encoding(coder)
     step.add_property(
         PropertyNames.OUTPUT_INFO,
         [{PropertyNames.USER_NAME: (
@@ -1212,8 +1214,7 @@
     coder = coders.WindowedValueCoder(transform.sink.coder,
                                       coders.coders.GlobalWindowCoder())
     from apache_beam.runners.dataflow.internal import apiclient
-    use_fnapi = apiclient._use_fnapi(options)
-    step.encoding = self._get_cloud_encoding(coder, use_fnapi)
+    step.encoding = self._get_cloud_encoding(coder)
     step.add_property(PropertyNames.ENCODING, step.encoding)
     step.add_property(
         PropertyNames.PARALLEL_INPUT,
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index c47ab88..58c722c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -238,13 +238,14 @@
     p | ptransform.Create([1])  # pylint: disable=expression-not-assigned
     p.run()
     job_dict = json.loads(str(remote_runner.job))
-    self.assertEqual(len(job_dict[u'steps']), 2)
+    self.assertEqual(len(job_dict[u'steps']), 3)
 
     self.assertEqual(job_dict[u'steps'][0][u'kind'], u'ParallelRead')
     self.assertEqual(
         job_dict[u'steps'][0][u'properties'][u'pubsub_subscription'],
         '_starting_signal/')
     self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo')
+    self.assertEqual(job_dict[u'steps'][2][u'kind'], u'ParallelDo')
 
   def test_biqquery_read_streaming_fail(self):
     remote_runner = DataflowRunner()
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 1ccbd13..da37813 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -894,13 +894,6 @@
       'use_unified_worker' in debug_options.experiments)
 
 
-def _use_sdf_bounded_source(pipeline_options):
-  debug_options = pipeline_options.view_as(DebugOptions)
-  return _use_fnapi(pipeline_options) and (
-      debug_options.experiments and
-      'use_sdf_bounded_source' in debug_options.experiments)
-
-
 def _get_container_image_tag():
   base_version = pkg_resources.parse_version(
       beam_version.__version__).base_version
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index fdce49b..111259d 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -106,6 +106,7 @@
   OUTPUT_INFO = 'output_info'
   OUTPUT_NAME = 'output_name'
   PARALLEL_INPUT = 'parallel_input'
+  PIPELINE_PROTO_TRANSFORM_ID = 'pipeline_proto_transform_id'
   PUBSUB_ID_LABEL = 'pubsub_id_label'
   PUBSUB_SERIALIZED_ATTRIBUTES_FN = 'pubsub_serialized_attributes_fn'
   PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
index 828455b..a0a6541 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
@@ -188,7 +188,7 @@
     p | Create(['a', 'b', 'c']) | _NativeWrite(sink)  # pylint: disable=expression-not-assigned
     p.run()
 
-    self.assertEqual(['a', 'b', 'c'], sink.written_values)
+    self.assertEqual(['a', 'b', 'c'], sorted(sink.written_values))
 
 
 class Test_NativeWrite(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
deleted file mode 100644
index 481209e..0000000
--- a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
+++ /dev/null
@@ -1,76 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Create transform for streaming."""
-
-from __future__ import absolute_import
-
-from builtins import map
-
-from apache_beam import DoFn
-from apache_beam import ParDo
-from apache_beam import PTransform
-from apache_beam import Windowing
-from apache_beam import pvalue
-from apache_beam.transforms.window import GlobalWindows
-
-
-class StreamingCreate(PTransform):
-  """A specialized implementation for ``Create`` transform in streaming mode.
-
-  Note: There is no unbounded source API in python to wrap the Create source,
-  so we map this to composite of Impulse primitive and an SDF.
-  """
-
-  def __init__(self, values, coder):
-    self.coder = coder
-    self.encoded_values = list(map(coder.encode, values))
-
-  class DecodeAndEmitDoFn(DoFn):
-    """A DoFn which stores encoded versions of elements.
-
-    It also stores a Coder to decode and emit those elements.
-    TODO: BEAM-2422 - Make this a SplittableDoFn.
-    """
-
-    def __init__(self, encoded_values, coder):
-      self.encoded_values = encoded_values
-      self.coder = coder
-
-    def process(self, unused_element):
-      for encoded_value in self.encoded_values:
-        yield self.coder.decode(encoded_value)
-
-  class Impulse(PTransform):
-    """The Dataflow specific override for the impulse primitive."""
-
-    def expand(self, pbegin):
-      assert isinstance(pbegin, pvalue.PBegin), (
-          'Input to Impulse transform must be a PBegin but found %s' % pbegin)
-      return pvalue.PCollection(pbegin.pipeline, is_bounded=False)
-
-    def get_windowing(self, inputs):
-      return Windowing(GlobalWindows())
-
-    def infer_output_type(self, unused_input_type):
-      return bytes
-
-  def expand(self, pbegin):
-    return (pbegin
-            | 'Impulse' >> self.Impulse()
-            | 'Decode Values' >> ParDo(
-                self.DecodeAndEmitDoFn(self.encoded_values, self.coder)))
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 6e84c15..e3e76a5 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -19,7 +19,6 @@
 
 from __future__ import absolute_import
 
-from apache_beam.coders import typecoders
 from apache_beam.pipeline import PTransformOverride
 
 
@@ -30,24 +29,24 @@
     # Imported here to avoid circular dependencies.
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam import Create
-    from apache_beam.options.pipeline_options import StandardOptions
+    from apache_beam.runners.dataflow.internal import apiclient
 
     if isinstance(applied_ptransform.transform, Create):
-      standard_options = (applied_ptransform
-                          .outputs[None]
-                          .pipeline._options
-                          .view_as(StandardOptions))
-      return standard_options.streaming
+      return not apiclient._use_fnapi(
+          applied_ptransform.outputs[None].pipeline._options)
     else:
       return False
 
   def get_replacement_transform(self, ptransform):
     # Imported here to avoid circular dependencies.
     # pylint: disable=wrong-import-order, wrong-import-position
-    from apache_beam.runners.dataflow.native_io.streaming_create import \
-      StreamingCreate
-    coder = typecoders.registry.get_coder(ptransform.get_output_type())
-    return StreamingCreate(ptransform.values, coder)
+    from apache_beam import PTransform
+    # Return a wrapper rather than ptransform.as_read() directly to
+    # ensure backwards compatibility of the pipeline structure.
+    class LegacyCreate(PTransform):
+      def expand(self, pbegin):
+        return pbegin | ptransform.as_read()
+    return LegacyCreate().with_output_types(ptransform.get_output_type())
 
 
 class ReadPTransformOverride(PTransformOverride):
@@ -57,11 +56,51 @@
     from apache_beam.io import Read
     from apache_beam.io.iobase import BoundedSource
     # Only overrides Read(BoundedSource) transform
-    if isinstance(applied_ptransform.transform, Read):
+    if (isinstance(applied_ptransform.transform, Read)
+        and not getattr(applied_ptransform.transform, 'override', False)):
       if isinstance(applied_ptransform.transform.source, BoundedSource):
         return True
     return False
 
   def get_replacement_transform(self, ptransform):
-    from apache_beam.io.iobase import _SDFBoundedSourceWrapper
-    return _SDFBoundedSourceWrapper(ptransform.source)
+    from apache_beam import pvalue
+    from apache_beam.io import iobase
+    class Read(iobase.Read):
+      override = True
+      def expand(self, pbegin):
+        return pvalue.PCollection(
+            self.pipeline, is_bounded=self.source.is_bounded())
+    return Read(ptransform.source).with_output_types(
+        ptransform.get_type_hints().simple_output_type('Read'))
+
+
+class JrhReadPTransformOverride(PTransformOverride):
+  """A ``PTransformOverride`` for ``Read(BoundedSource)``"""
+
+  def matches(self, applied_ptransform):
+    from apache_beam.io import Read
+    from apache_beam.io.iobase import BoundedSource
+    return (isinstance(applied_ptransform.transform, Read)
+            and isinstance(applied_ptransform.transform.source, BoundedSource))
+
+  def get_replacement_transform(self, ptransform):
+    from apache_beam.io import Read
+    from apache_beam.transforms import core
+    from apache_beam.transforms import util
+    # Make this a local to narrow what's captured in the closure.
+    source = ptransform.source
+
+    class JrhRead(core.PTransform):
+      def expand(self, pbegin):
+        return (
+            pbegin
+            | core.Impulse()
+            | 'Split' >> core.FlatMap(lambda _: source.split(
+                Read.get_desired_chunk_size(source.estimate_size())))
+            | util.Reshuffle()
+            | 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
+                split.source.get_range_tracker(
+                    split.start_position, split.stop_position))))
+
+    return JrhRead().with_output_types(
+        ptransform.get_type_hints().simple_output_type('Read'))
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 97d4375..6d21e55 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -21,9 +21,8 @@
 import logging
 import unittest
 
+import apache_beam as beam
 from apache_beam import pvalue
-from apache_beam.io import Read
-from apache_beam.io import iobase
 from apache_beam.pipeline import Pipeline
 from apache_beam.pvalue import AsList
 from apache_beam.runners.direct import DirectRunner
@@ -51,10 +50,7 @@
       pass
 
   def test_root_transforms(self):
-    class DummySource(iobase.BoundedSource):
-      pass
-
-    root_read = Read(DummySource())
+    root_read = beam.Impulse()
     root_flatten = Flatten(pipeline=self.pipeline)
 
     pbegin = pvalue.PBegin(self.pipeline)
@@ -88,10 +84,7 @@
       def process(self, element, negatives):
         yield element
 
-    class DummySource(iobase.BoundedSource):
-      pass
-
-    root_read = Read(DummySource())
+    root_read = beam.Impulse()
 
     result = (self.pipeline
               | 'read' >> root_read
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
index 22a930c..95df81d 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner_test.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
@@ -79,7 +79,7 @@
         return [element]
 
     p = Pipeline(DirectRunner())
-    pcoll = (p | beam.Create([1, 2, 3, 4, 5])
+    pcoll = (p | beam.Create([1, 2, 3, 4, 5], reshuffle=False)
              | 'Do' >> beam.ParDo(MyDoFn()))
     assert_that(pcoll, equal_to([1, 2, 3, 4, 5]))
     result = p.run()
@@ -132,6 +132,10 @@
            | beam.Create([[]]).with_output_types(beam.typehints.List[int])
            | beam.combiners.Count.Globally())
 
+  def test_impulse(self):
+    with test_pipeline.TestPipeline(runner='BundleBasedDirectRunner') as p:
+      assert_that(p | beam.Impulse(), equal_to([b'']))
+
 
 class DirectRunnerRetryTests(unittest.TestCase):
 
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
index fd04d4c..d9d68cc 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
@@ -147,8 +147,16 @@
     super(SDFDirectRunnerTest, self).setUp()
     # Importing following for DirectRunner SDF implemenation for testing.
     from apache_beam.runners.direct import transform_evaluator
-    self._default_max_num_outputs = (
+    self._old_default_max_num_outputs = (
         transform_evaluator._ProcessElementsEvaluator.DEFAULT_MAX_NUM_OUTPUTS)
+    self._default_max_num_outputs = (
+        transform_evaluator._ProcessElementsEvaluator.DEFAULT_MAX_NUM_OUTPUTS
+        ) = 100
+
+  def tearDown(self):
+    from apache_beam.runners.direct import transform_evaluator
+    transform_evaluator._ProcessElementsEvaluator.DEFAULT_MAX_NUM_OUTPUTS = (
+        self._old_default_max_num_outputs)
 
   def run_sdf_read_pipeline(
       self, num_files, num_records_per_file, resume_count=None):
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index d451f71..c893d8b 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -83,6 +83,7 @@
         io.Read: _BoundedReadEvaluator,
         _DirectReadFromPubSub: _PubSubReadEvaluator,
         core.Flatten: _FlattenEvaluator,
+        core.Impulse: _ImpulseEvaluator,
         core.ParDo: _ParDoEvaluator,
         core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
         _StreamingGroupByKeyOnly: _StreamingGroupByKeyOnlyEvaluator,
@@ -517,6 +518,17 @@
     return TransformResult(self, bundles, [], None, None)
 
 
+class _ImpulseEvaluator(_TransformEvaluator):
+  """TransformEvaluator for Impulse transform."""
+
+  def finish_bundle(self):
+    assert len(self._outputs) == 1
+    output_pcollection = list(self._outputs)[0]
+    bundle = self._evaluation_context.create_bundle(output_pcollection)
+    bundle.output(GlobalWindows.windowed_value(b''))
+    return TransformResult(self, [bundle], [], None, None)
+
+
 class _TaggedReceivers(dict):
   """Received ParDo output and redirect to the associated output bundle."""
 
@@ -905,7 +917,7 @@
 
   # Maximum number of elements that will be produced by a Splittable DoFn before
   # a checkpoint is requested by the runner.
-  DEFAULT_MAX_NUM_OUTPUTS = 100
+  DEFAULT_MAX_NUM_OUTPUTS = None
   # Maximum duration a Splittable DoFn will process an element before a
   # checkpoint is requested by the runner.
   DEFAULT_MAX_DURATION = 1
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
index e860226..b0433ff 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
@@ -226,7 +226,7 @@
     pipeline_proto = to_stable_runner_api(p)
 
     pipeline_info = pipeline_analyzer.PipelineInfo(pipeline_proto.components)
-    pcoll_id = 'ref_PCollection_PCollection_3'  # Output PCollection of Square
+    pcoll_id = 'ref_PCollection_PCollection_12'  # Output PCollection of Square
     cache_label1 = pipeline_info.cache_label(pcoll_id)
 
     analyzer = pipeline_analyzer.PipelineAnalyzer(self.cache_manager,
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 22b3b09..c45b8e3 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -88,7 +88,7 @@
   def test_pcolls_to_pcoll_id(self):
     p = beam.Pipeline(interactive_runner.InteractiveRunner())
     # pylint: disable=range-builtin-not-iterating
-    init_pcoll = p | 'Init Create' >> beam.Create(range(10))
+    init_pcoll = p | 'Init Create' >> beam.Impulse()
     _, ctx = p.to_runner_api(use_fake_coders=True, return_context=True)
     self.assertEqual(instr.pcolls_to_pcoll_id(p, ctx), {
         str(init_pcoll): 'ref_PCollection_PCollection_1'})
@@ -100,7 +100,7 @@
     _, ctx = p.to_runner_api(use_fake_coders=True, return_context=True)
     self.assertEqual(
         instr.cacheable_key(init_pcoll, instr.pcolls_to_pcoll_id(p, ctx)),
-        str(id(init_pcoll)) + '_ref_PCollection_PCollection_1')
+        str(id(init_pcoll)) + '_ref_PCollection_PCollection_10')
 
   def test_cacheable_key_with_version_map(self):
     p = beam.Pipeline(interactive_runner.InteractiveRunner())
@@ -123,8 +123,8 @@
     # init_pcoll_2 is supplied as long as the version map is given.
     self.assertEqual(
         instr.cacheable_key(init_pcoll_2, instr.pcolls_to_pcoll_id(p2, ctx), {
-            'ref_PCollection_PCollection_1': str(id(init_pcoll))}),
-        str(id(init_pcoll)) + '_ref_PCollection_PCollection_1')
+            'ref_PCollection_PCollection_10': str(id(init_pcoll))}),
+        str(id(init_pcoll)) + '_ref_PCollection_PCollection_10')
 
   def test_cache_key(self):
     p = beam.Pipeline(interactive_runner.InteractiveRunner())
@@ -137,13 +137,13 @@
 
     pin = instr.pin(p)
     self.assertEqual(pin.cache_key(init_pcoll), 'init_pcoll_' + str(
-        id(init_pcoll)) + '_ref_PCollection_PCollection_1_' + str(id(
+        id(init_pcoll)) + '_ref_PCollection_PCollection_10_' + str(id(
             init_pcoll.producer)))
     self.assertEqual(pin.cache_key(squares), 'squares_' + str(
-        id(squares)) + '_ref_PCollection_PCollection_2_' + str(id(
+        id(squares)) + '_ref_PCollection_PCollection_11_' + str(id(
             squares.producer)))
     self.assertEqual(pin.cache_key(cubes), 'cubes_' + str(
-        id(cubes)) + '_ref_PCollection_PCollection_3_' + str(id(
+        id(cubes)) + '_ref_PCollection_PCollection_12_' + str(id(
             cubes.producer)))
 
   def test_cacheables(self):
@@ -159,21 +159,21 @@
         pin._cacheable_key(init_pcoll): {
             'var': 'init_pcoll',
             'version': str(id(init_pcoll)),
-            'pcoll_id': 'ref_PCollection_PCollection_1',
+            'pcoll_id': 'ref_PCollection_PCollection_10',
             'producer_version': str(id(init_pcoll.producer)),
             'pcoll': init_pcoll
         },
         pin._cacheable_key(squares): {
             'var': 'squares',
             'version': str(id(squares)),
-            'pcoll_id': 'ref_PCollection_PCollection_2',
+            'pcoll_id': 'ref_PCollection_PCollection_11',
             'producer_version': str(id(squares.producer)),
             'pcoll': squares
         },
         pin._cacheable_key(cubes): {
             'var': 'cubes',
             'version': str(id(cubes)),
-            'pcoll_id': 'ref_PCollection_PCollection_3',
+            'pcoll_id': 'ref_PCollection_PCollection_12',
             'producer_version': str(id(cubes.producer)),
             'pcoll': cubes
         }
@@ -283,11 +283,11 @@
 
     # Mock as if cacheable PCollections are cached.
     init_pcoll_cache_key = 'init_pcoll_' + str(
-        id(init_pcoll)) + '_ref_PCollection_PCollection_1_' + str(id(
+        id(init_pcoll)) + '_ref_PCollection_PCollection_10_' + str(id(
             init_pcoll.producer))
     self._mock_write_cache(init_pcoll, init_pcoll_cache_key)
     second_pcoll_cache_key = 'second_pcoll_' + str(
-        id(second_pcoll)) + '_ref_PCollection_PCollection_2_' + str(id(
+        id(second_pcoll)) + '_ref_PCollection_PCollection_11_' + str(id(
             second_pcoll.producer))
     self._mock_write_cache(second_pcoll, second_pcoll_cache_key)
     ie.current_env().cache_manager().exists = MagicMock(return_value=True)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index ea9d02b..71343e5 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -385,9 +385,6 @@
     self._profiler_factory = profiler.Profile.factory_from_options(
         options.view_as(pipeline_options.ProfilingOptions))
 
-    if 'use_sdf_bounded_source' in experiments:
-      pipeline.replace_all(DataflowRunner._SDF_PTRANSFORM_OVERRIDES)
-
     self._latest_run_result = self.run_via_runner_api(pipeline.to_runner_api(
         default_environment=self._default_environment))
     return self._latest_run_result
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 23480ce..ef09b1f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -271,8 +271,10 @@
                 ('B', 'b', 3)]
 
     with self.create_pipeline() as p:
-      assert_that(p | beam.Create(inputs) | beam.ParDo(AddIndex()),
-                  equal_to(expected))
+      # TODO(BEAM-8893): Allow the reshuffle.
+      assert_that(
+          p | beam.Create(inputs, reshuffle=False) | beam.ParDo(AddIndex()),
+          equal_to(expected))
 
   @unittest.skip('TestStream not yet supported')
   def test_teststream_pardo_timers(self):
@@ -417,7 +419,8 @@
     with self.create_pipeline() as p:
       actual = (
           p
-          | beam.Create(elements)
+          # TODO(BEAM-8893): Allow the reshuffle.
+          | beam.Create(elements, reshuffle=False)
           # Send even and odd elements to different windows.
           | beam.Map(lambda e: window.TimestampedValue(e, ord(e) % 2))
           | beam.WindowInto(window.FixedWindows(1) if windowed
@@ -777,8 +780,10 @@
       self, monitoring_infos, urn, labels, value=None, ge_value=None):
     # TODO(ajamato): Consider adding a matcher framework
     found = 0
+    matches = []
     for mi in monitoring_infos:
       if has_urn_and_labels(mi, urn, labels):
+        matches.append(mi.metric.counter_data.int64_value)
         if ge_value is not None:
           if mi.metric.counter_data.int64_value >= ge_value:
             found = found + 1
@@ -790,8 +795,8 @@
     ge_value_str = {'ge_value' : ge_value} if ge_value else ''
     value_str = {'value' : value} if value else ''
     self.assertEqual(
-        1, found, "Found (%s) Expected only 1 monitoring_info for %s." %
-        (found, (urn, labels, value_str, ge_value_str),))
+        1, found, "Found (%s, %s) Expected only 1 monitoring_info for %s." %
+        (found, matches, (urn, labels, value_str, ge_value_str),))
 
   def assert_has_distribution(
       self, monitoring_infos, urn, labels,
@@ -833,10 +838,7 @@
         (found, (urn, labels, str(description)),))
 
   def create_pipeline(self):
-    p = beam.Pipeline(runner=fn_api_runner.FnApiRunner())
-    # TODO(BEAM-8448): Fix these tests.
-    p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
-    return p
+    return beam.Pipeline(runner=fn_api_runner.FnApiRunner())
 
   def test_element_count_metrics(self):
     class GenerateTwoOutputs(beam.DoFn):
@@ -854,7 +856,8 @@
 
     # Produce enough elements to make sure byte sampling occurs.
     num_source_elems = 100
-    pcoll = p | beam.Create(['a%d' % i for i in range(num_source_elems)])
+    pcoll = p | beam.Create(
+        ['a%d' % i for i in range(num_source_elems)], reshuffle=False)
 
     # pylint: disable=expression-not-assigned
     pardo = ('StepThatDoesTwoOutputs' >> beam.ParDo(
@@ -883,13 +886,14 @@
                       and
                       monitoring_infos.PCOLLECTION_LABEL not in x.labels])
     try:
-      labels = {monitoring_infos.PCOLLECTION_LABEL : 'Impulse'}
+      labels = {
+          monitoring_infos.PCOLLECTION_LABEL : 'ref_PCollection_PCollection_1'}
       self.assert_has_counter(
           counters, monitoring_infos.ELEMENT_COUNT_URN, labels, 1)
 
-      # Create/Read, "out" output.
+      # Create output.
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                    'ref_PCollection_PCollection_1'}
+                    'ref_PCollection_PCollection_3'}
       self.assert_has_counter(
           counters,
           monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
@@ -902,7 +906,7 @@
 
       # GenerateTwoOutputs, main output.
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                    'ref_PCollection_PCollection_2'}
+                    'ref_PCollection_PCollection_4'}
       self.assert_has_counter(
           counters,
           monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
@@ -915,7 +919,7 @@
 
       # GenerateTwoOutputs, "SecondOutput" output.
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                    'ref_PCollection_PCollection_3'}
+                    'ref_PCollection_PCollection_5'}
       self.assert_has_counter(
           counters,
           monitoring_infos.ELEMENT_COUNT_URN, labels, 2 * num_source_elems)
@@ -928,7 +932,7 @@
 
       # GenerateTwoOutputs, "ThirdOutput" output.
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                    'ref_PCollection_PCollection_4'}
+                    'ref_PCollection_PCollection_6'}
       self.assert_has_counter(
           counters,
           monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
@@ -943,7 +947,7 @@
       # outputs.
       # Flatten/Read, main output.
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                    'ref_PCollection_PCollection_5'}
+                    'ref_PCollection_PCollection_7'}
       self.assert_has_counter(
           counters,
           monitoring_infos.ELEMENT_COUNT_URN, labels, 4 * num_source_elems)
@@ -956,7 +960,7 @@
 
       # PassThrough, main output
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                    'ref_PCollection_PCollection_6'}
+                    'ref_PCollection_PCollection_8'}
       self.assert_has_counter(
           counters,
           monitoring_infos.ELEMENT_COUNT_URN, labels, 4 * num_source_elems)
@@ -969,7 +973,7 @@
 
       # PassThrough2, main output
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                    'ref_PCollection_PCollection_7'}
+                    'ref_PCollection_PCollection_9'}
       self.assert_has_counter(
           counters,
           monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
@@ -1014,7 +1018,8 @@
       namespace = split[0]
       name = ':'.join(split[1:])
       assert_counter_exists(
-          all_metrics_via_montoring_infos, namespace, name, step='Create/Read')
+          all_metrics_via_montoring_infos, namespace, name,
+          step='Create/Impulse')
       assert_counter_exists(
           all_metrics_via_montoring_infos, namespace, name, step='MyStep')
 
@@ -1027,7 +1032,8 @@
     p = self.create_pipeline()
 
     _ = (p
-         | beam.Create([0, 0, 0, 5e-3 * DEFAULT_SAMPLING_PERIOD_MS])
+         | beam.Create(
+             [0, 0, 0, 5e-3 * DEFAULT_SAMPLING_PERIOD_MS], reshuffle=False)
          | beam.Map(time.sleep)
          | beam.Map(lambda x: ('key', x))
          | beam.GroupByKey()
@@ -1051,13 +1057,13 @@
       # Test the DEPRECATED legacy metrics
       pregbk_metrics, postgbk_metrics = list(
           res._metrics_by_stage.values())
-      if 'Create/Read' not in pregbk_metrics.ptransforms:
+      if 'Create/Map(decode)' not in pregbk_metrics.ptransforms:
         # The metrics above are actually unordered. Swap.
         pregbk_metrics, postgbk_metrics = postgbk_metrics, pregbk_metrics
       self.assertEqual(
           4,
-          pregbk_metrics.ptransforms['Create/Read']
-          .processed_elements.measured.output_element_counts['out'])
+          pregbk_metrics.ptransforms['Create/Map(decode)']
+          .processed_elements.measured.output_element_counts['None'])
       self.assertEqual(
           4,
           pregbk_metrics.ptransforms['Map(sleep)']
@@ -1089,20 +1095,20 @@
       self.assertEqual(2, len(res._monitoring_infos_by_stage))
       pregbk_mis, postgbk_mis = list(res._monitoring_infos_by_stage.values())
 
-      if not has_mi_for_ptransform(pregbk_mis, 'Create/Read'):
+      if not has_mi_for_ptransform(pregbk_mis, 'Create/Map(decode)'):
         # The monitoring infos above are actually unordered. Swap.
         pregbk_mis, postgbk_mis = postgbk_mis, pregbk_mis
 
       # pregbk monitoring infos
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                'ref_PCollection_PCollection_1'}
+                'ref_PCollection_PCollection_3'}
       self.assert_has_counter(
           pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
       self.assert_has_distribution(
           pregbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
 
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                'ref_PCollection_PCollection_2'}
+                'ref_PCollection_PCollection_4'}
       self.assert_has_counter(
           pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
       self.assert_has_distribution(
@@ -1115,14 +1121,14 @@
 
       # postgbk monitoring infos
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                'ref_PCollection_PCollection_6'}
+                'ref_PCollection_PCollection_8'}
       self.assert_has_counter(
           postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
       self.assert_has_distribution(
           postgbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
 
       labels = {monitoring_infos.PCOLLECTION_LABEL :
-                'ref_PCollection_PCollection_7'}
+                'ref_PCollection_PCollection_9'}
       self.assert_has_counter(
           postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=5)
       self.assert_has_distribution(
@@ -1406,7 +1412,7 @@
       with self.create_pipeline() as p:
         grouped = (
             p
-            | beam.Create(elements)
+            | beam.Create(elements, reshuffle=False)
             | 'SDF' >> beam.ParDo(EnumerateSdf()))
         flat = grouped | beam.FlatMap(lambda x: x)
         assert_that(flat, equal_to(expected))
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
index a1c462a..15d1770 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -274,12 +274,12 @@
                   elm=beam.DoFn.ElementParam,
                   ts=beam.DoFn.TimestampParam,
                   side=beam.DoFn.SideInputParam):
-        yield (elm, ts, side)
+        yield (elm, ts, sorted(side))
 
     records = (main_stream     # pylint: disable=unused-variable
                | beam.ParDo(RecordFn(), beam.pvalue.AsList(side)))
 
-    assert_that(records, equal_to([('e', Timestamp(10), [2, 1, 4])]))
+    assert_that(records, equal_to([('e', Timestamp(10), [1, 2, 4])]))
 
     p.run()
 
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 8794d2a..3169d53 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -38,7 +38,6 @@
 from apache_beam.coders import typecoders
 from apache_beam.internal import pickler
 from apache_beam.internal import util
-from apache_beam.options.pipeline_options import DebugOptions
 from apache_beam.options.pipeline_options import TypeOptions
 from apache_beam.portability import common_urns
 from apache_beam.portability import python_urns
@@ -2511,38 +2510,32 @@
 
   def expand(self, pbegin):
     assert isinstance(pbegin, pvalue.PBegin)
-    # Must guard against this as some legacy runners don't implement impulse.
-    debug_options = pbegin.pipeline._options.view_as(DebugOptions)
-    fn_api = (debug_options.experiments
-              and 'beam_fn_api' in debug_options.experiments)
-    if fn_api:
-      coder = typecoders.registry.get_coder(self.get_output_type())
-      serialized_values = [coder.encode(v) for v in self.values]
-      reshuffle = self.reshuffle
-      # Avoid the "redistributing" reshuffle for 0 and 1 element Creates.
-      # These special cases are often used in building up more complex
-      # transforms (e.g. Write).
+    coder = typecoders.registry.get_coder(self.get_output_type())
+    serialized_values = [coder.encode(v) for v in self.values]
+    reshuffle = self.reshuffle
+    # Avoid the "redistributing" reshuffle for 0 and 1 element Creates.
+    # These special cases are often used in building up more complex
+    # transforms (e.g. Write).
 
-      class MaybeReshuffle(PTransform):
-        def expand(self, pcoll):
-          if len(serialized_values) > 1 and reshuffle:
-            from apache_beam.transforms.util import Reshuffle
-            return pcoll | Reshuffle()
-          else:
-            return pcoll
-      return (
-          pbegin
-          | Impulse()
-          | FlatMap(lambda _: serialized_values)
-          | MaybeReshuffle()
-          | Map(coder.decode).with_output_types(self.get_output_type()))
-    else:
-      self.pipeline = pbegin.pipeline
-      from apache_beam.io import iobase
-      coder = typecoders.registry.get_coder(self.get_output_type())
-      source = self._create_source_from_iterable(self.values, coder)
-      return (pbegin.pipeline
-              | iobase.Read(source).with_output_types(self.get_output_type()))
+    class MaybeReshuffle(PTransform):
+      def expand(self, pcoll):
+        if len(serialized_values) > 1 and reshuffle:
+          from apache_beam.transforms.util import Reshuffle
+          return pcoll | Reshuffle()
+        else:
+          return pcoll
+    return (
+        pbegin
+        | Impulse()
+        | FlatMap(lambda _: serialized_values).with_output_types(bytes)
+        | MaybeReshuffle().with_output_types(bytes)
+        | Map(coder.decode).with_output_types(self.get_output_type()))
+
+  def as_read(self):
+    from apache_beam.io import iobase
+    coder = typecoders.registry.get_coder(self.get_output_type())
+    source = self._create_source_from_iterable(self.values, coder)
+    return iobase.Read(source).with_output_types(self.get_output_type())
 
   def get_windowing(self, unused_inputs):
     return Windowing(GlobalWindows())
@@ -2558,6 +2551,7 @@
     return _CreateSource(serialized_values, coder)
 
 
+@typehints.with_output_types(bytes)
 class Impulse(PTransform):
   """Impulse primitive."""
 
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 380708d..0c9459f 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -506,9 +506,10 @@
     # pylint: disable=wrong-import-order, wrong-import-position
     from apache_beam.transforms.core import Create
     # pylint: enable=wrong-import-order, wrong-import-position
-    replacements = {id(v): p | 'CreatePInput%s' % ix >> Create(v)
-                    for ix, v in enumerate(pvalues)
-                    if not isinstance(v, pvalue.PValue) and v is not None}
+    replacements = {
+        id(v): p | 'CreatePInput%s' % ix >> Create(v, reshuffle=False)
+        for ix, v in enumerate(pvalues)
+        if not isinstance(v, pvalue.PValue) and v is not None}
     pvalueish = _SetInputPValues().visit(pvalueish, replacements)
     self.pipeline = p
     result = p.apply(self, pvalueish, label)
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index ad87082..ffb245c 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -84,29 +84,29 @@
                      str(PTransform()))
 
     pa = TestPipeline()
-    res = pa | 'ALabel' >> beam.Create([1, 2])
-    self.assertEqual('AppliedPTransform(ALabel/Read, Read)',
+    res = pa | 'ALabel' >> beam.Impulse()
+    self.assertEqual('AppliedPTransform(ALabel, Impulse)',
                      str(res.producer))
 
     pc = TestPipeline()
-    res = pc | beam.Create([1, 2])
+    res = pc | beam.Impulse()
     inputs_tr = res.producer.transform
     inputs_tr.inputs = ('ci',)
     self.assertEqual(
-        """<Read(PTransform) label=[Read] inputs=('ci',)>""",
+        "<Impulse(PTransform) label=[Impulse] inputs=('ci',)>",
         str(inputs_tr))
 
     pd = TestPipeline()
-    res = pd | beam.Create([1, 2])
+    res = pd | beam.Impulse()
     side_tr = res.producer.transform
     side_tr.side_inputs = (4,)
     self.assertEqual(
-        '<Read(PTransform) label=[Read] side_inputs=(4,)>',
+        '<Impulse(PTransform) label=[Impulse] side_inputs=(4,)>',
         str(side_tr))
 
     inputs_tr.side_inputs = ('cs',)
     self.assertEqual(
-        """<Read(PTransform) label=[Read] """
+        """<Impulse(PTransform) label=[Impulse] """
         """inputs=('ci',) side_inputs=('cs',)>""",
         str(inputs_tr))
 
@@ -495,7 +495,7 @@
     pipeline = TestPipeline()
     pcoll = pipeline | 'start' >> beam.Create(
         [(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
-    result = pcoll | 'Group' >> beam.GroupByKey()
+    result = pcoll | 'Group' >> beam.GroupByKey() | _SortLists
     assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
     pipeline.run()
 
@@ -562,7 +562,7 @@
     created = pipeline | 'A' >> beam.Create(contents)
     partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
     flattened = partitioned | 'C' >> beam.Flatten()
-    grouped = flattened | 'D' >> beam.GroupByKey()
+    grouped = flattened | 'D' >> beam.GroupByKey() | _SortLists
     assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
     pipeline.run()
 
@@ -654,7 +654,7 @@
         [('a', 1), ('a', 2), ('b', 3), ('c', 4)])
     pcoll_2 = pipeline | 'Start 2' >> beam.Create(
         [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
-    result = (pcoll_1, pcoll_2) | beam.CoGroupByKey()
+    result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() | _SortLists
     assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
                                   ('b', ([3], [])),
                                   ('c', ([4], [7, 8]))]))
@@ -667,6 +667,7 @@
     pcoll_2 = pipeline | 'Start 2' >> beam.Create(
         [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
     result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey()
+    result |= _SortLists
     assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
                                   ('b', ([3], [])),
                                   ('c', ([4], [7, 8]))]))
@@ -679,6 +680,7 @@
     pcoll_2 = pipeline | 'Start 2' >> beam.Create(
         [('a', 5), ('a', 6), ('c', 7), ('c', 8)])
     result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
+    result |= _SortLists
     assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
                                   ('b', {'X': [3], 'Y': []}),
                                   ('c', {'X': [4], 'Y': [7, 8]})]))
@@ -760,8 +762,9 @@
                           ([1, 2, 3], [100]) | beam.Flatten())
     join_input = ([('k', 'a')],
                   [('k', 'b'), ('k', 'c')])
-    self.assertCountEqual([('k', (['a'], ['b', 'c']))],
-                          join_input | beam.CoGroupByKey())
+    self.assertCountEqual(
+        [('k', (['a'], ['b', 'c']))],
+        join_input | beam.CoGroupByKey() | _SortLists)
 
   def test_multi_input_ptransform(self):
     class DisjointUnion(PTransform):
@@ -908,7 +911,9 @@
   def check_label(self, ptransform, expected_label):
     pipeline = TestPipeline()
     pipeline | 'Start' >> beam.Create([('a', 1)]) | ptransform
-    actual_label = sorted(pipeline.applied_labels - {'Start', 'Start/Read'})[0]
+    actual_label = sorted(
+        label for label in pipeline.applied_labels
+        if not label.startswith('Start'))[0]
     self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
 
   def test_default_labels(self):
@@ -1365,11 +1370,12 @@
               | 'T' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g'])
               .with_output_types(str)
               | 'GenKeys' >> beam.Map(group_with_upper_ord)
-              | 'O' >> beam.GroupByKey())
+              | 'O' >> beam.GroupByKey()
+              | _SortLists)
 
     assert_that(result, equal_to([(1, ['g']),
-                                  (3, ['s', 'i', 'n']),
-                                  (4, ['t', 'e', 't'])]))
+                                  (3, ['i', 'n', 's']),
+                                  (4, ['e', 't', 't'])]))
     self.p.run()
 
   def test_pipeline_checking_satisfied_but_run_time_types_violate(self):
@@ -1415,7 +1421,8 @@
     result = (self.p
               | 'Nums' >> beam.Create(range(5)).with_output_types(int)
               | 'IsEven' >> beam.Map(is_even_as_key)
-              | 'Parity' >> beam.GroupByKey())
+              | 'Parity' >> beam.GroupByKey()
+              | _SortLists)
 
     assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
     self.p.run()
@@ -1429,7 +1436,7 @@
     # passed instead.
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create([1, 2, 3])
+       | beam.Create([1, 1, 1])
        | ('ToInt' >> beam.FlatMap(lambda x: [int(x)])
           .with_input_types(str).with_output_types(int)))
       self.p.run()
@@ -1474,7 +1481,7 @@
             int)).get_type_hints())
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create([1, 2, 3])
+       | beam.Create([1, 1, 1])
        | ('ToInt' >> beam.FlatMap(lambda x: [float(x)])
           .with_input_types(int).with_output_types(int))
       )
@@ -1681,7 +1688,7 @@
 
     with self.assertRaises(typehints.TypeCheckError) as e:
       (self.p
-       | beam.Create(range(3)).with_output_types(int)
+       | beam.Create([0]).with_output_types(int)
        | ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
           .with_input_types(str).with_output_types(str)))
       self.p.run()
@@ -2238,5 +2245,19 @@
     p.run()
 
 
+def _sort_lists(result):
+  if isinstance(result, list):
+    return sorted(result)
+  elif isinstance(result, tuple):
+    return tuple(_sort_lists(e) for e in result)
+  elif isinstance(result, dict):
+    return {k: _sort_lists(v) for k, v in result.items()}
+  else:
+    return result
+
+
+_SortLists = beam.Map(_sort_lists)
+
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py
index 21ef0ec..601a1d4 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -521,7 +521,7 @@
                               ('key', 2),
                               ('key', 3),
                               ('key', 4),
-                              ('key', 3)])
+                              ('key', 3)], reshuffle=False)
     actual_values = (values
                      | beam.ParDo(SetStatefulDoFn()))
 
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 7a87e60..c7f273a 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -25,6 +25,7 @@
 import contextlib
 import random
 import re
+import sys
 import time
 import typing
 import warnings
@@ -34,6 +35,7 @@
 from builtins import zip
 
 from future.utils import itervalues
+from past.builtins import long
 
 from apache_beam import coders
 from apache_beam import typehints
@@ -648,7 +650,7 @@
         key, windowed_values = element
         return [wv.with_value((key, wv.value)) for wv in windowed_values]
 
-    ungrouped = pcoll | Map(reify_timestamps)
+    ungrouped = pcoll | Map(reify_timestamps).with_output_types(typing.Any)
 
     # TODO(BEAM-8104) Using global window as one of the standard window.
     # This is to mitigate the Dataflow Java Runner Harness limitation to
@@ -660,7 +662,7 @@
         timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
     result = (ungrouped
               | GroupByKey()
-              | FlatMap(restore_timestamps))
+              | FlatMap(restore_timestamps).with_output_types(typing.Any))
     result._windowing = windowing_saved
     return result
 
@@ -680,10 +682,16 @@
   """
 
   def expand(self, pcoll):
+    if sys.version_info >= (3,):
+      KeyedT = typing.Tuple[int, T]
+    else:
+      KeyedT = typing.Tuple[long, T]  # pylint: disable=long-builtin
     return (pcoll
             | 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
+            .with_input_types(T).with_output_types(KeyedT)
             | ReshufflePerKey()
-            | 'RemoveRandomKeys' >> Map(lambda t: t[1]))
+            | 'RemoveRandomKeys' >> Map(lambda t: t[1])
+            .with_input_types(KeyedT).with_output_types(T))
 
   def to_runner_api_parameter(self, unused_context):
     return common_urns.composites.RESHUFFLE.urn, None
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 74829e5..58cf243 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -100,7 +100,7 @@
     with TestPipeline() as p:
       res = (
           p
-          | beam.Create(range(47))
+          | beam.Create(range(47), reshuffle=False)
           | beam.Map(lambda t: window.TimestampedValue(t, t))
           | beam.WindowInto(window.FixedWindows(30))
           | util.BatchElements(
@@ -351,7 +351,8 @@
 
     after_gbk = (pipeline
                  | beam.Create(data)
-                 | beam.GroupByKey())
+                 | beam.GroupByKey()
+                 | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
     assert_that(after_gbk, equal_to(expected_result), label='after_gbk')
     after_reshuffle = after_gbk | beam.Reshuffle()
     assert_that(after_reshuffle, equal_to(expected_result),
@@ -435,7 +436,8 @@
     before_reshuffle = (pipeline
                         | beam.Create(data)
                         | beam.WindowInto(GlobalWindows())
-                        | beam.GroupByKey())
+                        | beam.GroupByKey()
+                        | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
     assert_that(before_reshuffle, equal_to(expected_data),
                 label='before_reshuffle')
     after_reshuffle = before_reshuffle | beam.Reshuffle()
@@ -452,7 +454,8 @@
                         | beam.Create(data)
                         | beam.WindowInto(SlidingWindows(
                             size=window_size, period=1))
-                        | beam.GroupByKey())
+                        | beam.GroupByKey()
+                        | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
     assert_that(before_reshuffle, equal_to(expected_data),
                 label='before_reshuffle')
     after_reshuffle = before_reshuffle | beam.Reshuffle()
@@ -471,7 +474,8 @@
     before_reshuffle = (pipeline
                         | beam.Create(data)
                         | beam.WindowInto(GlobalWindows())
-                        | beam.GroupByKey())
+                        | beam.GroupByKey()
+                        | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
     assert_that(before_reshuffle, equal_to(expected_data),
                 label='before_reshuffle')
     after_reshuffle = before_reshuffle | beam.Reshuffle()
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index a405948..30430cc 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -196,6 +196,7 @@
       result = (pcoll
                 | 'w' >> WindowInto(SlidingWindows(period=2, size=4))
                 | GroupByKey()
+                | beam.MapTuple(lambda k, vs: (k, sorted(vs)))
                 | reify_windows)
       expected = [('key @ [-2.0, 2.0)', [1]),
                   ('key @ [0.0, 4.0)', [1, 2, 3]),
@@ -222,7 +223,8 @@
                 | Map(lambda x_t: TimestampedValue(x_t[0], x_t[1]))
                 | 'w' >> WindowInto(FixedWindows(5))
                 | Map(lambda v: ('key', v))
-                | GroupByKey())
+                | GroupByKey()
+                | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
       assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
                                     ('key', [5, 6, 7, 8, 9])]))
 
@@ -237,7 +239,8 @@
                 | 'rewindow' >> WindowInto(FixedWindows(5))
                 | 'rewindow2' >> WindowInto(FixedWindows(5))
                 | Map(lambda v: ('key', v))
-                | GroupByKey())
+                | GroupByKey()
+                | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
       assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
                                     ('key', sorted([5, 6, 7, 8, 9] * 3))]))