Merge pull request #10283 [BEAM-8882] Allow Dataflow to automatically choose portability.
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
index e149451..fb7ee42 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input.py
@@ -49,7 +49,7 @@
selected = None
len_corpus = len(corpus)
while not selected:
- c = list(corpus[randrange(0, len_corpus - 1)].values())[0]
+ c = list(corpus[randrange(0, len_corpus)].values())[0]
if c != ignore:
selected = c
@@ -59,7 +59,7 @@
selected = None
len_words = len(words)
while not selected:
- c = list(words[randrange(0, len_words - 1)].values())[0]
+ c = list(words[randrange(0, len_words)].values())[0]
if c != ignore:
selected = c
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
index 11fb95b..031eeb3 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_side_input_test.py
@@ -36,9 +36,9 @@
group_ids_pcoll = p | 'CreateGroupIds' >> beam.Create(['A', 'B', 'C'])
corpus_pcoll = p | 'CreateCorpus' >> beam.Create(
- [{'f': 'corpus1'}, {'f': 'corpus2'}, {'f': 'corpus3'}])
+ [{'f': 'corpus1'}, {'f': 'corpus2'}])
words_pcoll = p | 'CreateWords' >> beam.Create(
- [{'f': 'word1'}, {'f': 'word2'}, {'f': 'word3'}])
+ [{'f': 'word1'}, {'f': 'word2'}])
ignore_corpus_pcoll = p | 'CreateIgnoreCorpus' >> beam.Create(['corpus1'])
ignore_word_pcoll = p | 'CreateIgnoreWord' >> beam.Create(['word1'])
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 4b52266..f0f53e2 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -528,11 +528,12 @@
def test_model_pcollection(self):
temp_path = self.create_temp_file()
snippets.model_pcollection(['--output=%s' % temp_path])
- self.assertEqual(self.get_output(temp_path, sorted_output=False), [
+ self.assertEqual(self.get_output(temp_path), [
+ 'Or to take arms against a sea of troubles, ',
+ 'The slings and arrows of outrageous fortune, ',
'To be, or not to be: that is the question: ',
'Whether \'tis nobler in the mind to suffer ',
- 'The slings and arrows of outrageous fortune, ',
- 'Or to take arms against a sea of troubles, '])
+ ])
def test_construct_pipeline(self):
temp_path = self.create_temp_file(
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
index bbf8d3a..92e37c6 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads_test.py
@@ -122,7 +122,7 @@
with TestPipeline() as p:
output_pcs = (
p
- | beam.Create(_DESTINATION_ELEMENT_PAIRS)
+ | beam.Create(_DESTINATION_ELEMENT_PAIRS, reshuffle=False)
| beam.ParDo(fn, self.tmpdir)
.with_outputs(fn.WRITTEN_FILE_TAG, fn.UNWRITTEN_RECORD_TAG))
@@ -325,7 +325,7 @@
('destination0', ['file2', 'file3'])]
single_partition_result = [('destination1', ['file0', 'file1'])]
with TestPipeline() as p:
- destination_file_pairs = p | beam.Create(self._ELEMENTS)
+ destination_file_pairs = p | beam.Create(self._ELEMENTS, reshuffle=False)
partitioned_files = (
destination_file_pairs
| beam.ParDo(bqfl.PartitionFiles(1000, 2))
@@ -347,7 +347,7 @@
('destination0', ['file3'])]
single_partition_result = [('destination1', ['file0', 'file1'])]
with TestPipeline() as p:
- destination_file_pairs = p | beam.Create(self._ELEMENTS)
+ destination_file_pairs = p | beam.Create(self._ELEMENTS, reshuffle=False)
partitioned_files = (
destination_file_pairs
| beam.ParDo(bqfl.PartitionFiles(150, 10))
@@ -533,7 +533,7 @@
with TestPipeline('DirectRunner') as p:
outputs = (p
- | beam.Create(_ELEMENTS)
+ | beam.Create(_ELEMENTS, reshuffle=False)
| bqfl.BigQueryBatchFileLoads(
destination,
custom_gcs_temp_location=self._new_tempdir(),
@@ -660,7 +660,7 @@
experiments='use_beam_bq_sink')
with beam.Pipeline(argv=args) as p:
- input = p | beam.Create(_ELEMENTS)
+ input = p | beam.Create(_ELEMENTS, reshuffle=False)
schema_map_pcv = beam.pvalue.AsDict(
p | "MakeSchemas" >> beam.Create(schema_kv_pairs))
diff --git a/sdks/python/apache_beam/io/iobase.py b/sdks/python/apache_beam/io/iobase.py
index e2bd696..0a1b211 100644
--- a/sdks/python/apache_beam/io/iobase.py
+++ b/sdks/python/apache_beam/io/iobase.py
@@ -851,7 +851,6 @@
@staticmethod
def get_desired_chunk_size(total_size):
- total_size
if total_size:
# 1MB = 1 shard, 1GB = 32 shards, 1TB = 1000 shards, 1PB = 32k shards
chunk_size = max(1 << 20, 1000 * int(math.sqrt(total_size)))
@@ -860,31 +859,11 @@
return chunk_size
def expand(self, pbegin):
- from apache_beam.options.pipeline_options import DebugOptions
- from apache_beam.transforms import util
-
- assert isinstance(pbegin, pvalue.PBegin)
- self.pipeline = pbegin.pipeline
-
- debug_options = self.pipeline._options.view_as(DebugOptions)
- if debug_options.experiments and 'beam_fn_api' in debug_options.experiments:
- source = self.source
-
- def split_source(unused_impulse):
- return source.split(
- self.get_desired_chunk_size(self.source.estimate_size()))
-
- return (
- pbegin
- | core.Impulse()
- | 'Split' >> core.FlatMap(split_source)
- | util.Reshuffle()
- | 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
- split.source.get_range_tracker(
- split.start_position, split.stop_position))))
+ if isinstance(self.source, BoundedSource):
+ return pbegin | _SDFBoundedSourceWrapper(self.source)
else:
# Treat Read itself as a primitive.
- return pvalue.PCollection(self.pipeline,
+ return pvalue.PCollection(pbegin.pipeline,
is_bounded=self.source.is_bounded())
def get_windowing(self, unused_inputs):
@@ -1534,7 +1513,11 @@
def _create_sdf_bounded_source_dofn(self):
source = self.source
- chunk_size = Read.get_desired_chunk_size(source.estimate_size())
+ try:
+ estimated_size = source.estimate_size()
+ except NotImplementedError:
+ estimated_size = None
+ chunk_size = Read.get_desired_chunk_size(estimated_size)
class SDFBoundedSourceDoFn(core.DoFn):
def __init__(self, read_source):
diff --git a/sdks/python/apache_beam/io/iobase_test.py b/sdks/python/apache_beam/io/iobase_test.py
index 9772591..a574d20 100644
--- a/sdks/python/apache_beam/io/iobase_test.py
+++ b/sdks/python/apache_beam/io/iobase_test.py
@@ -198,8 +198,6 @@
experiments = (p._options.view_as(DebugOptions).experiments or [])
# Setup experiment option to enable using SDFBoundedSourceWrapper
- if 'use_sdf_bounded_source' not in experiments:
- experiments.append('use_sdf_bounded_source')
if 'beam_fn_api' not in experiments:
# Required so mocking below doesn't mock Create used in assert_that.
experiments.append('beam_fn_api')
diff --git a/sdks/python/apache_beam/io/parquetio_test.py b/sdks/python/apache_beam/io/parquetio_test.py
index 34f8eba..719bf55 100644
--- a/sdks/python/apache_beam/io/parquetio_test.py
+++ b/sdks/python/apache_beam/io/parquetio_test.py
@@ -312,7 +312,7 @@
path = dst.name
with TestPipeline() as p:
_ = p \
- | Create(self.RECORDS) \
+ | Create(self.RECORDS, reshuffle=False) \
| WriteToParquet(
path, self.SCHEMA, num_shards=1, shard_name_template='')
with TestPipeline() as p:
diff --git a/sdks/python/apache_beam/io/textio_test.py b/sdks/python/apache_beam/io/textio_test.py
index ecfa6fb..ad336c5 100644
--- a/sdks/python/apache_beam/io/textio_test.py
+++ b/sdks/python/apache_beam/io/textio_test.py
@@ -1113,7 +1113,7 @@
with open(file_name, 'rb') as f:
read_result.extend(f.read().splitlines())
- self.assertEqual(read_result, self.lines)
+ self.assertEqual(sorted(read_result), sorted(self.lines))
def test_write_dataflow_auto_compression(self):
pipeline = TestPipeline()
@@ -1126,7 +1126,7 @@
with gzip.GzipFile(file_name, 'rb') as f:
read_result.extend(f.read().splitlines())
- self.assertEqual(read_result, self.lines)
+ self.assertEqual(sorted(read_result), sorted(self.lines))
def test_write_dataflow_auto_compression_unsharded(self):
pipeline = TestPipeline()
@@ -1142,7 +1142,7 @@
with gzip.GzipFile(file_name, 'rb') as f:
read_result.extend(f.read().splitlines())
- self.assertEqual(read_result, self.lines)
+ self.assertEqual(sorted(read_result), sorted(self.lines))
def test_write_dataflow_header(self):
pipeline = TestPipeline()
@@ -1159,7 +1159,8 @@
with gzip.GzipFile(file_name, 'rb') as f:
read_result.extend(f.read().splitlines())
# header_text is automatically encoded in WriteToText
- self.assertEqual(read_result, [header_text.encode('utf-8')] + self.lines)
+ self.assertEqual(read_result[0], header_text.encode('utf-8'))
+ self.assertEqual(sorted(read_result[1:]), sorted(self.lines))
if __name__ == '__main__':
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py b/sdks/python/apache_beam/io/tfrecordio_test.py
index 1f7ba2a..dfb154a 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -228,7 +228,7 @@
file_name, options=tf.python_io.TFRecordOptions(
tf.python_io.TFRecordCompressionType.GZIP)):
actual.append(r)
- self.assertEqual(actual, input_data)
+ self.assertEqual(sorted(actual), sorted(input_data))
def test_write_record_auto(self):
with TempDir() as temp_dir:
@@ -244,7 +244,7 @@
file_name, options=tf.python_io.TFRecordOptions(
tf.python_io.TFRecordCompressionType.GZIP)):
actual.append(r)
- self.assertEqual(actual, input_data)
+ self.assertEqual(sorted(actual), sorted(input_data))
class TestReadFromTFRecord(unittest.TestCase):
diff --git a/sdks/python/apache_beam/pipeline_test.py b/sdks/python/apache_beam/pipeline_test.py
index c1c25d1..b1b2e68 100644
--- a/sdks/python/apache_beam/pipeline_test.py
+++ b/sdks/python/apache_beam/pipeline_test.py
@@ -253,7 +253,7 @@
def test_visit_entire_graph(self):
pipeline = Pipeline()
- pcoll1 = pipeline | 'pcoll' >> Create([1, 2, 3])
+ pcoll1 = pipeline | 'pcoll' >> beam.Impulse()
pcoll2 = pcoll1 | 'do1' >> FlatMap(lambda x: [x + 1])
pcoll3 = pcoll2 | 'do2' >> FlatMap(lambda x: [x + 1])
pcoll4 = pcoll2 | 'do3' >> FlatMap(lambda x: [x + 1])
@@ -266,9 +266,9 @@
set(visitor.visited))
self.assertEqual(set(visitor.enter_composite),
set(visitor.leave_composite))
- self.assertEqual(3, len(visitor.enter_composite))
- self.assertEqual(visitor.enter_composite[2].transform, transform)
- self.assertEqual(visitor.leave_composite[1].transform, transform)
+ self.assertEqual(2, len(visitor.enter_composite))
+ self.assertEqual(visitor.enter_composite[1].transform, transform)
+ self.assertEqual(visitor.leave_composite[0].transform, transform)
def test_apply_custom_transform(self):
pipeline = TestPipeline()
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 718ab61..1fae45d 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -101,12 +101,19 @@
# TODO: Remove the apache_beam.pipeline dependency in CreatePTransformOverride
from apache_beam.runners.dataflow.ptransform_overrides import CreatePTransformOverride
from apache_beam.runners.dataflow.ptransform_overrides import ReadPTransformOverride
+ from apache_beam.runners.dataflow.ptransform_overrides import JrhReadPTransformOverride
_PTRANSFORM_OVERRIDES = [
- CreatePTransformOverride(),
]
- _SDF_PTRANSFORM_OVERRIDES = [
+ _JRH_PTRANSFORM_OVERRIDES = [
+ JrhReadPTransformOverride(),
+ ]
+
+ # These overrides should be applied after the proto representation of the
+ # graph is created.
+ _NON_PORTABLE_PTRANSFORM_OVERRIDES = [
+ CreatePTransformOverride(),
ReadPTransformOverride(),
]
@@ -395,8 +402,10 @@
# done before Runner API serialization, since the new proto needs to contain
# any added PTransforms.
pipeline.replace_all(DataflowRunner._PTRANSFORM_OVERRIDES)
- if apiclient._use_sdf_bounded_source(options):
- pipeline.replace_all(DataflowRunner._SDF_PTRANSFORM_OVERRIDES)
+
+ if (apiclient._use_fnapi(options)
+ and not apiclient._use_unified_worker(options)):
+ pipeline.replace_all(DataflowRunner._JRH_PTRANSFORM_OVERRIDES)
use_fnapi = apiclient._use_fnapi(options)
from apache_beam.transforms import environments
@@ -424,6 +433,11 @@
self.proto_pipeline, self.proto_context = pipeline.to_runner_api(
return_context=True, default_environment=default_environment)
+ else:
+ # Performing configured PTransform overrides which should not be reflected
+ # in the proto representation of the graph.
+ pipeline.replace_all(DataflowRunner._NON_PORTABLE_PTRANSFORM_OVERRIDES)
+
# Add setup_options for all the BeamPlugin imports
setup_options = options.view_as(SetupOptions)
plugins = BeamPlugin.get_all_plugin_paths()
@@ -504,10 +518,10 @@
result.metric_results = self._metrics
return result
- def _get_typehint_based_encoding(self, typehint, window_coder, use_fnapi):
+ def _get_typehint_based_encoding(self, typehint, window_coder):
"""Returns an encoding based on a typehint object."""
return self._get_cloud_encoding(
- self._get_coder(typehint, window_coder=window_coder), use_fnapi)
+ self._get_coder(typehint, window_coder=window_coder))
@staticmethod
def _get_coder(typehint, window_coder):
@@ -518,13 +532,12 @@
window_coder=window_coder)
return coders.registry.get_coder(typehint)
- def _get_cloud_encoding(self, coder, use_fnapi):
+ def _get_cloud_encoding(self, coder, unused=None):
"""Returns an encoding based on a coder object."""
if not isinstance(coder, coders.Coder):
raise TypeError('Coder object must inherit from coders.Coder: %s.' %
str(coder))
- return coder.as_cloud_object(self.proto_context
- .coders if use_fnapi else None)
+ return coder.as_cloud_object(self.proto_context.coders)
def _get_side_input_encoding(self, input_encoding):
"""Returns an encoding for the output of a view transform.
@@ -567,11 +580,7 @@
output_tag].windowing.windowfn.get_window_coder())
else:
window_coder = None
- from apache_beam.runners.dataflow.internal import apiclient
- use_fnapi = apiclient._use_fnapi(
- list(transform_node.outputs.values())[0].pipeline._options)
- return self._get_typehint_based_encoding(element_type, window_coder,
- use_fnapi)
+ return self._get_typehint_based_encoding(element_type, window_coder)
def _add_step(self, step_kind, step_label, transform_node, side_tags=()):
"""Creates a Step object and adds it to the cache."""
@@ -879,6 +888,8 @@
serialized_data = pickler.dumps(
self._pardo_fn_data(transform_node, lookup_label))
step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
+ # TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
+ # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, transform_id)
step.add_property(
PropertyNames.PARALLEL_INPUT,
{'@type': 'OutputReference',
@@ -935,10 +946,9 @@
# Add the restriction encoding if we are a splittable DoFn
# and are using the Fn API on the unified worker.
restriction_coder = transform.get_restriction_coder()
- if (use_fnapi and use_unified_worker and restriction_coder):
+ if restriction_coder:
step.add_property(PropertyNames.RESTRICTION_ENCODING,
- self._get_cloud_encoding(
- restriction_coder, use_fnapi))
+ self._get_cloud_encoding(restriction_coder))
@staticmethod
def _pardo_fn_data(transform_node, get_label):
@@ -958,6 +968,7 @@
input_step = self._cache.get_pvalue(transform_node.inputs[0])
step = self._add_step(
TransformNames.COMBINE, transform_node.full_label, transform_node)
+ transform_id = self.proto_context.transforms.get_id(transform_node.parent)
# The data transmitted in SERIALIZED_FN is different depending on whether
# this is a fnapi pipeline or not.
@@ -967,8 +978,7 @@
# Fnapi pipelines send the transform ID of the CombineValues transform's
# parent composite because Dataflow expects the ID of a CombinePerKey
# transform.
- serialized_data = self.proto_context.transforms.get_id(
- transform_node.parent)
+ serialized_data = transform_id
else:
# Combiner functions do not take deferred side-inputs (i.e. PValues) and
# therefore the code to handle extra args/kwargs is simpler than for the
@@ -977,6 +987,8 @@
serialized_data = pickler.dumps((transform.fn, transform.args,
transform.kwargs, ()))
step.add_property(PropertyNames.SERIALIZED_FN, serialized_data)
+ # TODO(BEAM-8882): Enable once dataflow service doesn't reject this.
+ # step.add_property(PropertyNames.PIPELINE_PROTO_TRANSFORM_ID, transform_id)
step.add_property(
PropertyNames.PARALLEL_INPUT,
{'@type': 'OutputReference',
@@ -985,7 +997,7 @@
# Note that the accumulator must not have a WindowedValue encoding, while
# the output of this step does in fact have a WindowedValue encoding.
accumulator_encoding = self._get_cloud_encoding(
- transform_node.transform.fn.get_accumulator_coder(), use_fnapi)
+ transform_node.transform.fn.get_accumulator_coder())
output_encoding = self._get_encoded_output_coder(transform_node)
step.encoding = output_encoding
@@ -1005,16 +1017,7 @@
# Consider native Read to be a primitive for dataflow.
return beam.pvalue.PCollection.from_(pbegin)
else:
- debug_options = options.view_as(DebugOptions)
- if (
- debug_options.experiments and
- 'beam_fn_api' in debug_options.experiments
- ):
- # Expand according to FnAPI primitives.
- return self.apply_PTransform(transform, pbegin, options)
- else:
- # Custom Read is also a primitive for non-FnAPI on dataflow.
- return beam.pvalue.PCollection.from_(pbegin)
+ return self.apply_PTransform(transform, pbegin, options)
def run_Read(self, transform_node, options):
transform = transform_node.transform
@@ -1125,8 +1128,7 @@
coders.coders.GlobalWindowCoder())
from apache_beam.runners.dataflow.internal import apiclient
- use_fnapi = apiclient._use_fnapi(options)
- step.encoding = self._get_cloud_encoding(coder, use_fnapi)
+ step.encoding = self._get_cloud_encoding(coder)
step.add_property(
PropertyNames.OUTPUT_INFO,
[{PropertyNames.USER_NAME: (
@@ -1212,8 +1214,7 @@
coder = coders.WindowedValueCoder(transform.sink.coder,
coders.coders.GlobalWindowCoder())
from apache_beam.runners.dataflow.internal import apiclient
- use_fnapi = apiclient._use_fnapi(options)
- step.encoding = self._get_cloud_encoding(coder, use_fnapi)
+ step.encoding = self._get_cloud_encoding(coder)
step.add_property(PropertyNames.ENCODING, step.encoding)
step.add_property(
PropertyNames.PARALLEL_INPUT,
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index c47ab88..58c722c 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -238,13 +238,14 @@
p | ptransform.Create([1]) # pylint: disable=expression-not-assigned
p.run()
job_dict = json.loads(str(remote_runner.job))
- self.assertEqual(len(job_dict[u'steps']), 2)
+ self.assertEqual(len(job_dict[u'steps']), 3)
self.assertEqual(job_dict[u'steps'][0][u'kind'], u'ParallelRead')
self.assertEqual(
job_dict[u'steps'][0][u'properties'][u'pubsub_subscription'],
'_starting_signal/')
self.assertEqual(job_dict[u'steps'][1][u'kind'], u'ParallelDo')
+ self.assertEqual(job_dict[u'steps'][2][u'kind'], u'ParallelDo')
def test_biqquery_read_streaming_fail(self):
remote_runner = DataflowRunner()
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index 1ccbd13..da37813 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -894,13 +894,6 @@
'use_unified_worker' in debug_options.experiments)
-def _use_sdf_bounded_source(pipeline_options):
- debug_options = pipeline_options.view_as(DebugOptions)
- return _use_fnapi(pipeline_options) and (
- debug_options.experiments and
- 'use_sdf_bounded_source' in debug_options.experiments)
-
-
def _get_container_image_tag():
base_version = pkg_resources.parse_version(
beam_version.__version__).base_version
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/names.py b/sdks/python/apache_beam/runners/dataflow/internal/names.py
index fdce49b..111259d 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/names.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/names.py
@@ -106,6 +106,7 @@
OUTPUT_INFO = 'output_info'
OUTPUT_NAME = 'output_name'
PARALLEL_INPUT = 'parallel_input'
+ PIPELINE_PROTO_TRANSFORM_ID = 'pipeline_proto_transform_id'
PUBSUB_ID_LABEL = 'pubsub_id_label'
PUBSUB_SERIALIZED_ATTRIBUTES_FN = 'pubsub_serialized_attributes_fn'
PUBSUB_SUBSCRIPTION = 'pubsub_subscription'
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
index 828455b..a0a6541 100644
--- a/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/native_io/iobase_test.py
@@ -188,7 +188,7 @@
p | Create(['a', 'b', 'c']) | _NativeWrite(sink) # pylint: disable=expression-not-assigned
p.run()
- self.assertEqual(['a', 'b', 'c'], sink.written_values)
+ self.assertEqual(['a', 'b', 'c'], sorted(sink.written_values))
class Test_NativeWrite(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py b/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
deleted file mode 100644
index 481209e..0000000
--- a/sdks/python/apache_beam/runners/dataflow/native_io/streaming_create.py
+++ /dev/null
@@ -1,76 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Create transform for streaming."""
-
-from __future__ import absolute_import
-
-from builtins import map
-
-from apache_beam import DoFn
-from apache_beam import ParDo
-from apache_beam import PTransform
-from apache_beam import Windowing
-from apache_beam import pvalue
-from apache_beam.transforms.window import GlobalWindows
-
-
-class StreamingCreate(PTransform):
- """A specialized implementation for ``Create`` transform in streaming mode.
-
- Note: There is no unbounded source API in python to wrap the Create source,
- so we map this to composite of Impulse primitive and an SDF.
- """
-
- def __init__(self, values, coder):
- self.coder = coder
- self.encoded_values = list(map(coder.encode, values))
-
- class DecodeAndEmitDoFn(DoFn):
- """A DoFn which stores encoded versions of elements.
-
- It also stores a Coder to decode and emit those elements.
- TODO: BEAM-2422 - Make this a SplittableDoFn.
- """
-
- def __init__(self, encoded_values, coder):
- self.encoded_values = encoded_values
- self.coder = coder
-
- def process(self, unused_element):
- for encoded_value in self.encoded_values:
- yield self.coder.decode(encoded_value)
-
- class Impulse(PTransform):
- """The Dataflow specific override for the impulse primitive."""
-
- def expand(self, pbegin):
- assert isinstance(pbegin, pvalue.PBegin), (
- 'Input to Impulse transform must be a PBegin but found %s' % pbegin)
- return pvalue.PCollection(pbegin.pipeline, is_bounded=False)
-
- def get_windowing(self, inputs):
- return Windowing(GlobalWindows())
-
- def infer_output_type(self, unused_input_type):
- return bytes
-
- def expand(self, pbegin):
- return (pbegin
- | 'Impulse' >> self.Impulse()
- | 'Decode Values' >> ParDo(
- self.DecodeAndEmitDoFn(self.encoded_values, self.coder)))
diff --git a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
index 6e84c15..e3e76a5 100644
--- a/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
+++ b/sdks/python/apache_beam/runners/dataflow/ptransform_overrides.py
@@ -19,7 +19,6 @@
from __future__ import absolute_import
-from apache_beam.coders import typecoders
from apache_beam.pipeline import PTransformOverride
@@ -30,24 +29,24 @@
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam import Create
- from apache_beam.options.pipeline_options import StandardOptions
+ from apache_beam.runners.dataflow.internal import apiclient
if isinstance(applied_ptransform.transform, Create):
- standard_options = (applied_ptransform
- .outputs[None]
- .pipeline._options
- .view_as(StandardOptions))
- return standard_options.streaming
+ return not apiclient._use_fnapi(
+ applied_ptransform.outputs[None].pipeline._options)
else:
return False
def get_replacement_transform(self, ptransform):
# Imported here to avoid circular dependencies.
# pylint: disable=wrong-import-order, wrong-import-position
- from apache_beam.runners.dataflow.native_io.streaming_create import \
- StreamingCreate
- coder = typecoders.registry.get_coder(ptransform.get_output_type())
- return StreamingCreate(ptransform.values, coder)
+ from apache_beam import PTransform
+ # Return a wrapper rather than ptransform.as_read() directly to
+ # ensure backwards compatibility of the pipeline structure.
+ class LegacyCreate(PTransform):
+ def expand(self, pbegin):
+ return pbegin | ptransform.as_read()
+ return LegacyCreate().with_output_types(ptransform.get_output_type())
class ReadPTransformOverride(PTransformOverride):
@@ -57,11 +56,51 @@
from apache_beam.io import Read
from apache_beam.io.iobase import BoundedSource
# Only overrides Read(BoundedSource) transform
- if isinstance(applied_ptransform.transform, Read):
+ if (isinstance(applied_ptransform.transform, Read)
+ and not getattr(applied_ptransform.transform, 'override', False)):
if isinstance(applied_ptransform.transform.source, BoundedSource):
return True
return False
def get_replacement_transform(self, ptransform):
- from apache_beam.io.iobase import _SDFBoundedSourceWrapper
- return _SDFBoundedSourceWrapper(ptransform.source)
+ from apache_beam import pvalue
+ from apache_beam.io import iobase
+ class Read(iobase.Read):
+ override = True
+ def expand(self, pbegin):
+ return pvalue.PCollection(
+ self.pipeline, is_bounded=self.source.is_bounded())
+ return Read(ptransform.source).with_output_types(
+ ptransform.get_type_hints().simple_output_type('Read'))
+
+
+class JrhReadPTransformOverride(PTransformOverride):
+ """A ``PTransformOverride`` for ``Read(BoundedSource)``"""
+
+ def matches(self, applied_ptransform):
+ from apache_beam.io import Read
+ from apache_beam.io.iobase import BoundedSource
+ return (isinstance(applied_ptransform.transform, Read)
+ and isinstance(applied_ptransform.transform.source, BoundedSource))
+
+ def get_replacement_transform(self, ptransform):
+ from apache_beam.io import Read
+ from apache_beam.transforms import core
+ from apache_beam.transforms import util
+ # Make this a local to narrow what's captured in the closure.
+ source = ptransform.source
+
+ class JrhRead(core.PTransform):
+ def expand(self, pbegin):
+ return (
+ pbegin
+ | core.Impulse()
+ | 'Split' >> core.FlatMap(lambda _: source.split(
+ Read.get_desired_chunk_size(source.estimate_size())))
+ | util.Reshuffle()
+ | 'ReadSplits' >> core.FlatMap(lambda split: split.source.read(
+ split.source.get_range_tracker(
+ split.start_position, split.stop_position))))
+
+ return JrhRead().with_output_types(
+ ptransform.get_type_hints().simple_output_type('Read'))
diff --git a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
index 97d4375..6d21e55 100644
--- a/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
+++ b/sdks/python/apache_beam/runners/direct/consumer_tracking_pipeline_visitor_test.py
@@ -21,9 +21,8 @@
import logging
import unittest
+import apache_beam as beam
from apache_beam import pvalue
-from apache_beam.io import Read
-from apache_beam.io import iobase
from apache_beam.pipeline import Pipeline
from apache_beam.pvalue import AsList
from apache_beam.runners.direct import DirectRunner
@@ -51,10 +50,7 @@
pass
def test_root_transforms(self):
- class DummySource(iobase.BoundedSource):
- pass
-
- root_read = Read(DummySource())
+ root_read = beam.Impulse()
root_flatten = Flatten(pipeline=self.pipeline)
pbegin = pvalue.PBegin(self.pipeline)
@@ -88,10 +84,7 @@
def process(self, element, negatives):
yield element
- class DummySource(iobase.BoundedSource):
- pass
-
- root_read = Read(DummySource())
+ root_read = beam.Impulse()
result = (self.pipeline
| 'read' >> root_read
diff --git a/sdks/python/apache_beam/runners/direct/direct_runner_test.py b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
index 22a930c..95df81d 100644
--- a/sdks/python/apache_beam/runners/direct/direct_runner_test.py
+++ b/sdks/python/apache_beam/runners/direct/direct_runner_test.py
@@ -79,7 +79,7 @@
return [element]
p = Pipeline(DirectRunner())
- pcoll = (p | beam.Create([1, 2, 3, 4, 5])
+ pcoll = (p | beam.Create([1, 2, 3, 4, 5], reshuffle=False)
| 'Do' >> beam.ParDo(MyDoFn()))
assert_that(pcoll, equal_to([1, 2, 3, 4, 5]))
result = p.run()
@@ -132,6 +132,10 @@
| beam.Create([[]]).with_output_types(beam.typehints.List[int])
| beam.combiners.Count.Globally())
+ def test_impulse(self):
+ with test_pipeline.TestPipeline(runner='BundleBasedDirectRunner') as p:
+ assert_that(p | beam.Impulse(), equal_to([b'']))
+
class DirectRunnerRetryTests(unittest.TestCase):
diff --git a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
index fd04d4c..d9d68cc 100644
--- a/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
+++ b/sdks/python/apache_beam/runners/direct/sdf_direct_runner_test.py
@@ -147,8 +147,16 @@
super(SDFDirectRunnerTest, self).setUp()
# Importing following for DirectRunner SDF implemenation for testing.
from apache_beam.runners.direct import transform_evaluator
- self._default_max_num_outputs = (
+ self._old_default_max_num_outputs = (
transform_evaluator._ProcessElementsEvaluator.DEFAULT_MAX_NUM_OUTPUTS)
+ self._default_max_num_outputs = (
+ transform_evaluator._ProcessElementsEvaluator.DEFAULT_MAX_NUM_OUTPUTS
+ ) = 100
+
+ def tearDown(self):
+ from apache_beam.runners.direct import transform_evaluator
+ transform_evaluator._ProcessElementsEvaluator.DEFAULT_MAX_NUM_OUTPUTS = (
+ self._old_default_max_num_outputs)
def run_sdf_read_pipeline(
self, num_files, num_records_per_file, resume_count=None):
diff --git a/sdks/python/apache_beam/runners/direct/transform_evaluator.py b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
index d451f71..c893d8b 100644
--- a/sdks/python/apache_beam/runners/direct/transform_evaluator.py
+++ b/sdks/python/apache_beam/runners/direct/transform_evaluator.py
@@ -83,6 +83,7 @@
io.Read: _BoundedReadEvaluator,
_DirectReadFromPubSub: _PubSubReadEvaluator,
core.Flatten: _FlattenEvaluator,
+ core.Impulse: _ImpulseEvaluator,
core.ParDo: _ParDoEvaluator,
core._GroupByKeyOnly: _GroupByKeyOnlyEvaluator,
_StreamingGroupByKeyOnly: _StreamingGroupByKeyOnlyEvaluator,
@@ -517,6 +518,17 @@
return TransformResult(self, bundles, [], None, None)
+class _ImpulseEvaluator(_TransformEvaluator):
+ """TransformEvaluator for Impulse transform."""
+
+ def finish_bundle(self):
+ assert len(self._outputs) == 1
+ output_pcollection = list(self._outputs)[0]
+ bundle = self._evaluation_context.create_bundle(output_pcollection)
+ bundle.output(GlobalWindows.windowed_value(b''))
+ return TransformResult(self, [bundle], [], None, None)
+
+
class _TaggedReceivers(dict):
"""Received ParDo output and redirect to the associated output bundle."""
@@ -905,7 +917,7 @@
# Maximum number of elements that will be produced by a Splittable DoFn before
# a checkpoint is requested by the runner.
- DEFAULT_MAX_NUM_OUTPUTS = 100
+ DEFAULT_MAX_NUM_OUTPUTS = None
# Maximum duration a Splittable DoFn will process an element before a
# checkpoint is requested by the runner.
DEFAULT_MAX_DURATION = 1
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
index e860226..b0433ff 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_analyzer_test.py
@@ -226,7 +226,7 @@
pipeline_proto = to_stable_runner_api(p)
pipeline_info = pipeline_analyzer.PipelineInfo(pipeline_proto.components)
- pcoll_id = 'ref_PCollection_PCollection_3' # Output PCollection of Square
+ pcoll_id = 'ref_PCollection_PCollection_12' # Output PCollection of Square
cache_label1 = pipeline_info.cache_label(pcoll_id)
analyzer = pipeline_analyzer.PipelineAnalyzer(self.cache_manager,
diff --git a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
index 22b3b09..c45b8e3 100644
--- a/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
+++ b/sdks/python/apache_beam/runners/interactive/pipeline_instrument_test.py
@@ -88,7 +88,7 @@
def test_pcolls_to_pcoll_id(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
# pylint: disable=range-builtin-not-iterating
- init_pcoll = p | 'Init Create' >> beam.Create(range(10))
+ init_pcoll = p | 'Init Create' >> beam.Impulse()
_, ctx = p.to_runner_api(use_fake_coders=True, return_context=True)
self.assertEqual(instr.pcolls_to_pcoll_id(p, ctx), {
str(init_pcoll): 'ref_PCollection_PCollection_1'})
@@ -100,7 +100,7 @@
_, ctx = p.to_runner_api(use_fake_coders=True, return_context=True)
self.assertEqual(
instr.cacheable_key(init_pcoll, instr.pcolls_to_pcoll_id(p, ctx)),
- str(id(init_pcoll)) + '_ref_PCollection_PCollection_1')
+ str(id(init_pcoll)) + '_ref_PCollection_PCollection_10')
def test_cacheable_key_with_version_map(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
@@ -123,8 +123,8 @@
# init_pcoll_2 is supplied as long as the version map is given.
self.assertEqual(
instr.cacheable_key(init_pcoll_2, instr.pcolls_to_pcoll_id(p2, ctx), {
- 'ref_PCollection_PCollection_1': str(id(init_pcoll))}),
- str(id(init_pcoll)) + '_ref_PCollection_PCollection_1')
+ 'ref_PCollection_PCollection_10': str(id(init_pcoll))}),
+ str(id(init_pcoll)) + '_ref_PCollection_PCollection_10')
def test_cache_key(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
@@ -137,13 +137,13 @@
pin = instr.pin(p)
self.assertEqual(pin.cache_key(init_pcoll), 'init_pcoll_' + str(
- id(init_pcoll)) + '_ref_PCollection_PCollection_1_' + str(id(
+ id(init_pcoll)) + '_ref_PCollection_PCollection_10_' + str(id(
init_pcoll.producer)))
self.assertEqual(pin.cache_key(squares), 'squares_' + str(
- id(squares)) + '_ref_PCollection_PCollection_2_' + str(id(
+ id(squares)) + '_ref_PCollection_PCollection_11_' + str(id(
squares.producer)))
self.assertEqual(pin.cache_key(cubes), 'cubes_' + str(
- id(cubes)) + '_ref_PCollection_PCollection_3_' + str(id(
+ id(cubes)) + '_ref_PCollection_PCollection_12_' + str(id(
cubes.producer)))
def test_cacheables(self):
@@ -159,21 +159,21 @@
pin._cacheable_key(init_pcoll): {
'var': 'init_pcoll',
'version': str(id(init_pcoll)),
- 'pcoll_id': 'ref_PCollection_PCollection_1',
+ 'pcoll_id': 'ref_PCollection_PCollection_10',
'producer_version': str(id(init_pcoll.producer)),
'pcoll': init_pcoll
},
pin._cacheable_key(squares): {
'var': 'squares',
'version': str(id(squares)),
- 'pcoll_id': 'ref_PCollection_PCollection_2',
+ 'pcoll_id': 'ref_PCollection_PCollection_11',
'producer_version': str(id(squares.producer)),
'pcoll': squares
},
pin._cacheable_key(cubes): {
'var': 'cubes',
'version': str(id(cubes)),
- 'pcoll_id': 'ref_PCollection_PCollection_3',
+ 'pcoll_id': 'ref_PCollection_PCollection_12',
'producer_version': str(id(cubes.producer)),
'pcoll': cubes
}
@@ -283,11 +283,11 @@
# Mock as if cacheable PCollections are cached.
init_pcoll_cache_key = 'init_pcoll_' + str(
- id(init_pcoll)) + '_ref_PCollection_PCollection_1_' + str(id(
+ id(init_pcoll)) + '_ref_PCollection_PCollection_10_' + str(id(
init_pcoll.producer))
self._mock_write_cache(init_pcoll, init_pcoll_cache_key)
second_pcoll_cache_key = 'second_pcoll_' + str(
- id(second_pcoll)) + '_ref_PCollection_PCollection_2_' + str(id(
+ id(second_pcoll)) + '_ref_PCollection_PCollection_11_' + str(id(
second_pcoll.producer))
self._mock_write_cache(second_pcoll, second_pcoll_cache_key)
ie.current_env().cache_manager().exists = MagicMock(return_value=True)
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
index ea9d02b..71343e5 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner.py
@@ -385,9 +385,6 @@
self._profiler_factory = profiler.Profile.factory_from_options(
options.view_as(pipeline_options.ProfilingOptions))
- if 'use_sdf_bounded_source' in experiments:
- pipeline.replace_all(DataflowRunner._SDF_PTRANSFORM_OVERRIDES)
-
self._latest_run_result = self.run_via_runner_api(pipeline.to_runner_api(
default_environment=self._default_environment))
return self._latest_run_result
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
index 23480ce..ef09b1f 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py
@@ -271,8 +271,10 @@
('B', 'b', 3)]
with self.create_pipeline() as p:
- assert_that(p | beam.Create(inputs) | beam.ParDo(AddIndex()),
- equal_to(expected))
+ # TODO(BEAM-8893): Allow the reshuffle.
+ assert_that(
+ p | beam.Create(inputs, reshuffle=False) | beam.ParDo(AddIndex()),
+ equal_to(expected))
@unittest.skip('TestStream not yet supported')
def test_teststream_pardo_timers(self):
@@ -417,7 +419,8 @@
with self.create_pipeline() as p:
actual = (
p
- | beam.Create(elements)
+ # TODO(BEAM-8893): Allow the reshuffle.
+ | beam.Create(elements, reshuffle=False)
# Send even and odd elements to different windows.
| beam.Map(lambda e: window.TimestampedValue(e, ord(e) % 2))
| beam.WindowInto(window.FixedWindows(1) if windowed
@@ -777,8 +780,10 @@
self, monitoring_infos, urn, labels, value=None, ge_value=None):
# TODO(ajamato): Consider adding a matcher framework
found = 0
+ matches = []
for mi in monitoring_infos:
if has_urn_and_labels(mi, urn, labels):
+ matches.append(mi.metric.counter_data.int64_value)
if ge_value is not None:
if mi.metric.counter_data.int64_value >= ge_value:
found = found + 1
@@ -790,8 +795,8 @@
ge_value_str = {'ge_value' : ge_value} if ge_value else ''
value_str = {'value' : value} if value else ''
self.assertEqual(
- 1, found, "Found (%s) Expected only 1 monitoring_info for %s." %
- (found, (urn, labels, value_str, ge_value_str),))
+ 1, found, "Found (%s, %s) Expected only 1 monitoring_info for %s." %
+ (found, matches, (urn, labels, value_str, ge_value_str),))
def assert_has_distribution(
self, monitoring_infos, urn, labels,
@@ -833,10 +838,7 @@
(found, (urn, labels, str(description)),))
def create_pipeline(self):
- p = beam.Pipeline(runner=fn_api_runner.FnApiRunner())
- # TODO(BEAM-8448): Fix these tests.
- p.options.view_as(DebugOptions).experiments.remove('beam_fn_api')
- return p
+ return beam.Pipeline(runner=fn_api_runner.FnApiRunner())
def test_element_count_metrics(self):
class GenerateTwoOutputs(beam.DoFn):
@@ -854,7 +856,8 @@
# Produce enough elements to make sure byte sampling occurs.
num_source_elems = 100
- pcoll = p | beam.Create(['a%d' % i for i in range(num_source_elems)])
+ pcoll = p | beam.Create(
+ ['a%d' % i for i in range(num_source_elems)], reshuffle=False)
# pylint: disable=expression-not-assigned
pardo = ('StepThatDoesTwoOutputs' >> beam.ParDo(
@@ -883,13 +886,14 @@
and
monitoring_infos.PCOLLECTION_LABEL not in x.labels])
try:
- labels = {monitoring_infos.PCOLLECTION_LABEL : 'Impulse'}
+ labels = {
+ monitoring_infos.PCOLLECTION_LABEL : 'ref_PCollection_PCollection_1'}
self.assert_has_counter(
counters, monitoring_infos.ELEMENT_COUNT_URN, labels, 1)
- # Create/Read, "out" output.
+ # Create output.
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_1'}
+ 'ref_PCollection_PCollection_3'}
self.assert_has_counter(
counters,
monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
@@ -902,7 +906,7 @@
# GenerateTwoOutputs, main output.
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_2'}
+ 'ref_PCollection_PCollection_4'}
self.assert_has_counter(
counters,
monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
@@ -915,7 +919,7 @@
# GenerateTwoOutputs, "SecondOutput" output.
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_3'}
+ 'ref_PCollection_PCollection_5'}
self.assert_has_counter(
counters,
monitoring_infos.ELEMENT_COUNT_URN, labels, 2 * num_source_elems)
@@ -928,7 +932,7 @@
# GenerateTwoOutputs, "ThirdOutput" output.
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_4'}
+ 'ref_PCollection_PCollection_6'}
self.assert_has_counter(
counters,
monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
@@ -943,7 +947,7 @@
# outputs.
# Flatten/Read, main output.
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_5'}
+ 'ref_PCollection_PCollection_7'}
self.assert_has_counter(
counters,
monitoring_infos.ELEMENT_COUNT_URN, labels, 4 * num_source_elems)
@@ -956,7 +960,7 @@
# PassThrough, main output
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_6'}
+ 'ref_PCollection_PCollection_8'}
self.assert_has_counter(
counters,
monitoring_infos.ELEMENT_COUNT_URN, labels, 4 * num_source_elems)
@@ -969,7 +973,7 @@
# PassThrough2, main output
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_7'}
+ 'ref_PCollection_PCollection_9'}
self.assert_has_counter(
counters,
monitoring_infos.ELEMENT_COUNT_URN, labels, num_source_elems)
@@ -1014,7 +1018,8 @@
namespace = split[0]
name = ':'.join(split[1:])
assert_counter_exists(
- all_metrics_via_montoring_infos, namespace, name, step='Create/Read')
+ all_metrics_via_montoring_infos, namespace, name,
+ step='Create/Impulse')
assert_counter_exists(
all_metrics_via_montoring_infos, namespace, name, step='MyStep')
@@ -1027,7 +1032,8 @@
p = self.create_pipeline()
_ = (p
- | beam.Create([0, 0, 0, 5e-3 * DEFAULT_SAMPLING_PERIOD_MS])
+ | beam.Create(
+ [0, 0, 0, 5e-3 * DEFAULT_SAMPLING_PERIOD_MS], reshuffle=False)
| beam.Map(time.sleep)
| beam.Map(lambda x: ('key', x))
| beam.GroupByKey()
@@ -1051,13 +1057,13 @@
# Test the DEPRECATED legacy metrics
pregbk_metrics, postgbk_metrics = list(
res._metrics_by_stage.values())
- if 'Create/Read' not in pregbk_metrics.ptransforms:
+ if 'Create/Map(decode)' not in pregbk_metrics.ptransforms:
# The metrics above are actually unordered. Swap.
pregbk_metrics, postgbk_metrics = postgbk_metrics, pregbk_metrics
self.assertEqual(
4,
- pregbk_metrics.ptransforms['Create/Read']
- .processed_elements.measured.output_element_counts['out'])
+ pregbk_metrics.ptransforms['Create/Map(decode)']
+ .processed_elements.measured.output_element_counts['None'])
self.assertEqual(
4,
pregbk_metrics.ptransforms['Map(sleep)']
@@ -1089,20 +1095,20 @@
self.assertEqual(2, len(res._monitoring_infos_by_stage))
pregbk_mis, postgbk_mis = list(res._monitoring_infos_by_stage.values())
- if not has_mi_for_ptransform(pregbk_mis, 'Create/Read'):
+ if not has_mi_for_ptransform(pregbk_mis, 'Create/Map(decode)'):
# The monitoring infos above are actually unordered. Swap.
pregbk_mis, postgbk_mis = postgbk_mis, pregbk_mis
# pregbk monitoring infos
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_1'}
+ 'ref_PCollection_PCollection_3'}
self.assert_has_counter(
pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
self.assert_has_distribution(
pregbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_2'}
+ 'ref_PCollection_PCollection_4'}
self.assert_has_counter(
pregbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=4)
self.assert_has_distribution(
@@ -1115,14 +1121,14 @@
# postgbk monitoring infos
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_6'}
+ 'ref_PCollection_PCollection_8'}
self.assert_has_counter(
postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=1)
self.assert_has_distribution(
postgbk_mis, monitoring_infos.SAMPLED_BYTE_SIZE_URN, labels)
labels = {monitoring_infos.PCOLLECTION_LABEL :
- 'ref_PCollection_PCollection_7'}
+ 'ref_PCollection_PCollection_9'}
self.assert_has_counter(
postgbk_mis, monitoring_infos.ELEMENT_COUNT_URN, labels, value=5)
self.assert_has_distribution(
@@ -1406,7 +1412,7 @@
with self.create_pipeline() as p:
grouped = (
p
- | beam.Create(elements)
+ | beam.Create(elements, reshuffle=False)
| 'SDF' >> beam.ParDo(EnumerateSdf()))
flat = grouped | beam.FlatMap(lambda x: x)
assert_that(flat, equal_to(expected))
diff --git a/sdks/python/apache_beam/testing/test_stream_test.py b/sdks/python/apache_beam/testing/test_stream_test.py
index a1c462a..15d1770 100644
--- a/sdks/python/apache_beam/testing/test_stream_test.py
+++ b/sdks/python/apache_beam/testing/test_stream_test.py
@@ -274,12 +274,12 @@
elm=beam.DoFn.ElementParam,
ts=beam.DoFn.TimestampParam,
side=beam.DoFn.SideInputParam):
- yield (elm, ts, side)
+ yield (elm, ts, sorted(side))
records = (main_stream # pylint: disable=unused-variable
| beam.ParDo(RecordFn(), beam.pvalue.AsList(side)))
- assert_that(records, equal_to([('e', Timestamp(10), [2, 1, 4])]))
+ assert_that(records, equal_to([('e', Timestamp(10), [1, 2, 4])]))
p.run()
diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py
index 8794d2a..3169d53 100644
--- a/sdks/python/apache_beam/transforms/core.py
+++ b/sdks/python/apache_beam/transforms/core.py
@@ -38,7 +38,6 @@
from apache_beam.coders import typecoders
from apache_beam.internal import pickler
from apache_beam.internal import util
-from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import TypeOptions
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
@@ -2511,38 +2510,32 @@
def expand(self, pbegin):
assert isinstance(pbegin, pvalue.PBegin)
- # Must guard against this as some legacy runners don't implement impulse.
- debug_options = pbegin.pipeline._options.view_as(DebugOptions)
- fn_api = (debug_options.experiments
- and 'beam_fn_api' in debug_options.experiments)
- if fn_api:
- coder = typecoders.registry.get_coder(self.get_output_type())
- serialized_values = [coder.encode(v) for v in self.values]
- reshuffle = self.reshuffle
- # Avoid the "redistributing" reshuffle for 0 and 1 element Creates.
- # These special cases are often used in building up more complex
- # transforms (e.g. Write).
+ coder = typecoders.registry.get_coder(self.get_output_type())
+ serialized_values = [coder.encode(v) for v in self.values]
+ reshuffle = self.reshuffle
+ # Avoid the "redistributing" reshuffle for 0 and 1 element Creates.
+ # These special cases are often used in building up more complex
+ # transforms (e.g. Write).
- class MaybeReshuffle(PTransform):
- def expand(self, pcoll):
- if len(serialized_values) > 1 and reshuffle:
- from apache_beam.transforms.util import Reshuffle
- return pcoll | Reshuffle()
- else:
- return pcoll
- return (
- pbegin
- | Impulse()
- | FlatMap(lambda _: serialized_values)
- | MaybeReshuffle()
- | Map(coder.decode).with_output_types(self.get_output_type()))
- else:
- self.pipeline = pbegin.pipeline
- from apache_beam.io import iobase
- coder = typecoders.registry.get_coder(self.get_output_type())
- source = self._create_source_from_iterable(self.values, coder)
- return (pbegin.pipeline
- | iobase.Read(source).with_output_types(self.get_output_type()))
+ class MaybeReshuffle(PTransform):
+ def expand(self, pcoll):
+ if len(serialized_values) > 1 and reshuffle:
+ from apache_beam.transforms.util import Reshuffle
+ return pcoll | Reshuffle()
+ else:
+ return pcoll
+ return (
+ pbegin
+ | Impulse()
+ | FlatMap(lambda _: serialized_values).with_output_types(bytes)
+ | MaybeReshuffle().with_output_types(bytes)
+ | Map(coder.decode).with_output_types(self.get_output_type()))
+
+ def as_read(self):
+ from apache_beam.io import iobase
+ coder = typecoders.registry.get_coder(self.get_output_type())
+ source = self._create_source_from_iterable(self.values, coder)
+ return iobase.Read(source).with_output_types(self.get_output_type())
def get_windowing(self, unused_inputs):
return Windowing(GlobalWindows())
@@ -2558,6 +2551,7 @@
return _CreateSource(serialized_values, coder)
+@typehints.with_output_types(bytes)
class Impulse(PTransform):
"""Impulse primitive."""
diff --git a/sdks/python/apache_beam/transforms/ptransform.py b/sdks/python/apache_beam/transforms/ptransform.py
index 380708d..0c9459f 100644
--- a/sdks/python/apache_beam/transforms/ptransform.py
+++ b/sdks/python/apache_beam/transforms/ptransform.py
@@ -506,9 +506,10 @@
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.transforms.core import Create
# pylint: enable=wrong-import-order, wrong-import-position
- replacements = {id(v): p | 'CreatePInput%s' % ix >> Create(v)
- for ix, v in enumerate(pvalues)
- if not isinstance(v, pvalue.PValue) and v is not None}
+ replacements = {
+ id(v): p | 'CreatePInput%s' % ix >> Create(v, reshuffle=False)
+ for ix, v in enumerate(pvalues)
+ if not isinstance(v, pvalue.PValue) and v is not None}
pvalueish = _SetInputPValues().visit(pvalueish, replacements)
self.pipeline = p
result = p.apply(self, pvalueish, label)
diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py
index ad87082..ffb245c 100644
--- a/sdks/python/apache_beam/transforms/ptransform_test.py
+++ b/sdks/python/apache_beam/transforms/ptransform_test.py
@@ -84,29 +84,29 @@
str(PTransform()))
pa = TestPipeline()
- res = pa | 'ALabel' >> beam.Create([1, 2])
- self.assertEqual('AppliedPTransform(ALabel/Read, Read)',
+ res = pa | 'ALabel' >> beam.Impulse()
+ self.assertEqual('AppliedPTransform(ALabel, Impulse)',
str(res.producer))
pc = TestPipeline()
- res = pc | beam.Create([1, 2])
+ res = pc | beam.Impulse()
inputs_tr = res.producer.transform
inputs_tr.inputs = ('ci',)
self.assertEqual(
- """<Read(PTransform) label=[Read] inputs=('ci',)>""",
+ "<Impulse(PTransform) label=[Impulse] inputs=('ci',)>",
str(inputs_tr))
pd = TestPipeline()
- res = pd | beam.Create([1, 2])
+ res = pd | beam.Impulse()
side_tr = res.producer.transform
side_tr.side_inputs = (4,)
self.assertEqual(
- '<Read(PTransform) label=[Read] side_inputs=(4,)>',
+ '<Impulse(PTransform) label=[Impulse] side_inputs=(4,)>',
str(side_tr))
inputs_tr.side_inputs = ('cs',)
self.assertEqual(
- """<Read(PTransform) label=[Read] """
+ """<Impulse(PTransform) label=[Impulse] """
"""inputs=('ci',) side_inputs=('cs',)>""",
str(inputs_tr))
@@ -495,7 +495,7 @@
pipeline = TestPipeline()
pcoll = pipeline | 'start' >> beam.Create(
[(1, 1), (2, 1), (3, 1), (1, 2), (2, 2), (1, 3)])
- result = pcoll | 'Group' >> beam.GroupByKey()
+ result = pcoll | 'Group' >> beam.GroupByKey() | _SortLists
assert_that(result, equal_to([(1, [1, 2, 3]), (2, [1, 2]), (3, [1])]))
pipeline.run()
@@ -562,7 +562,7 @@
created = pipeline | 'A' >> beam.Create(contents)
partitioned = created | 'B' >> beam.Partition(lambda x, n: len(x) % n, 3)
flattened = partitioned | 'C' >> beam.Flatten()
- grouped = flattened | 'D' >> beam.GroupByKey()
+ grouped = flattened | 'D' >> beam.GroupByKey() | _SortLists
assert_that(grouped, equal_to([('aa', [1, 2]), ('bb', [2])]))
pipeline.run()
@@ -654,7 +654,7 @@
[('a', 1), ('a', 2), ('b', 3), ('c', 4)])
pcoll_2 = pipeline | 'Start 2' >> beam.Create(
[('a', 5), ('a', 6), ('c', 7), ('c', 8)])
- result = (pcoll_1, pcoll_2) | beam.CoGroupByKey()
+ result = (pcoll_1, pcoll_2) | beam.CoGroupByKey() | _SortLists
assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
('b', ([3], [])),
('c', ([4], [7, 8]))]))
@@ -667,6 +667,7 @@
pcoll_2 = pipeline | 'Start 2' >> beam.Create(
[('a', 5), ('a', 6), ('c', 7), ('c', 8)])
result = [pc for pc in (pcoll_1, pcoll_2)] | beam.CoGroupByKey()
+ result |= _SortLists
assert_that(result, equal_to([('a', ([1, 2], [5, 6])),
('b', ([3], [])),
('c', ([4], [7, 8]))]))
@@ -679,6 +680,7 @@
pcoll_2 = pipeline | 'Start 2' >> beam.Create(
[('a', 5), ('a', 6), ('c', 7), ('c', 8)])
result = {'X': pcoll_1, 'Y': pcoll_2} | beam.CoGroupByKey()
+ result |= _SortLists
assert_that(result, equal_to([('a', {'X': [1, 2], 'Y': [5, 6]}),
('b', {'X': [3], 'Y': []}),
('c', {'X': [4], 'Y': [7, 8]})]))
@@ -760,8 +762,9 @@
([1, 2, 3], [100]) | beam.Flatten())
join_input = ([('k', 'a')],
[('k', 'b'), ('k', 'c')])
- self.assertCountEqual([('k', (['a'], ['b', 'c']))],
- join_input | beam.CoGroupByKey())
+ self.assertCountEqual(
+ [('k', (['a'], ['b', 'c']))],
+ join_input | beam.CoGroupByKey() | _SortLists)
def test_multi_input_ptransform(self):
class DisjointUnion(PTransform):
@@ -908,7 +911,9 @@
def check_label(self, ptransform, expected_label):
pipeline = TestPipeline()
pipeline | 'Start' >> beam.Create([('a', 1)]) | ptransform
- actual_label = sorted(pipeline.applied_labels - {'Start', 'Start/Read'})[0]
+ actual_label = sorted(
+ label for label in pipeline.applied_labels
+ if not label.startswith('Start'))[0]
self.assertEqual(expected_label, re.sub(r'\d{3,}', '#', actual_label))
def test_default_labels(self):
@@ -1365,11 +1370,12 @@
| 'T' >> beam.Create(['t', 'e', 's', 't', 'i', 'n', 'g'])
.with_output_types(str)
| 'GenKeys' >> beam.Map(group_with_upper_ord)
- | 'O' >> beam.GroupByKey())
+ | 'O' >> beam.GroupByKey()
+ | _SortLists)
assert_that(result, equal_to([(1, ['g']),
- (3, ['s', 'i', 'n']),
- (4, ['t', 'e', 't'])]))
+ (3, ['i', 'n', 's']),
+ (4, ['e', 't', 't'])]))
self.p.run()
def test_pipeline_checking_satisfied_but_run_time_types_violate(self):
@@ -1415,7 +1421,8 @@
result = (self.p
| 'Nums' >> beam.Create(range(5)).with_output_types(int)
| 'IsEven' >> beam.Map(is_even_as_key)
- | 'Parity' >> beam.GroupByKey())
+ | 'Parity' >> beam.GroupByKey()
+ | _SortLists)
assert_that(result, equal_to([(False, [1, 3]), (True, [0, 2, 4])]))
self.p.run()
@@ -1429,7 +1436,7 @@
# passed instead.
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create([1, 2, 3])
+ | beam.Create([1, 1, 1])
| ('ToInt' >> beam.FlatMap(lambda x: [int(x)])
.with_input_types(str).with_output_types(int)))
self.p.run()
@@ -1474,7 +1481,7 @@
int)).get_type_hints())
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create([1, 2, 3])
+ | beam.Create([1, 1, 1])
| ('ToInt' >> beam.FlatMap(lambda x: [float(x)])
.with_input_types(int).with_output_types(int))
)
@@ -1681,7 +1688,7 @@
with self.assertRaises(typehints.TypeCheckError) as e:
(self.p
- | beam.Create(range(3)).with_output_types(int)
+ | beam.Create([0]).with_output_types(int)
| ('SortJoin' >> beam.CombineGlobally(lambda s: ''.join(sorted(s)))
.with_input_types(str).with_output_types(str)))
self.p.run()
@@ -2238,5 +2245,19 @@
p.run()
+def _sort_lists(result):
+ if isinstance(result, list):
+ return sorted(result)
+ elif isinstance(result, tuple):
+ return tuple(_sort_lists(e) for e in result)
+ elif isinstance(result, dict):
+ return {k: _sort_lists(v) for k, v in result.items()}
+ else:
+ return result
+
+
+_SortLists = beam.Map(_sort_lists)
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/transforms/userstate_test.py b/sdks/python/apache_beam/transforms/userstate_test.py
index 21ef0ec..601a1d4 100644
--- a/sdks/python/apache_beam/transforms/userstate_test.py
+++ b/sdks/python/apache_beam/transforms/userstate_test.py
@@ -521,7 +521,7 @@
('key', 2),
('key', 3),
('key', 4),
- ('key', 3)])
+ ('key', 3)], reshuffle=False)
actual_values = (values
| beam.ParDo(SetStatefulDoFn()))
diff --git a/sdks/python/apache_beam/transforms/util.py b/sdks/python/apache_beam/transforms/util.py
index 7a87e60..c7f273a 100644
--- a/sdks/python/apache_beam/transforms/util.py
+++ b/sdks/python/apache_beam/transforms/util.py
@@ -25,6 +25,7 @@
import contextlib
import random
import re
+import sys
import time
import typing
import warnings
@@ -34,6 +35,7 @@
from builtins import zip
from future.utils import itervalues
+from past.builtins import long
from apache_beam import coders
from apache_beam import typehints
@@ -648,7 +650,7 @@
key, windowed_values = element
return [wv.with_value((key, wv.value)) for wv in windowed_values]
- ungrouped = pcoll | Map(reify_timestamps)
+ ungrouped = pcoll | Map(reify_timestamps).with_output_types(typing.Any)
# TODO(BEAM-8104) Using global window as one of the standard window.
# This is to mitigate the Dataflow Java Runner Harness limitation to
@@ -660,7 +662,7 @@
timestamp_combiner=TimestampCombiner.OUTPUT_AT_EARLIEST)
result = (ungrouped
| GroupByKey()
- | FlatMap(restore_timestamps))
+ | FlatMap(restore_timestamps).with_output_types(typing.Any))
result._windowing = windowing_saved
return result
@@ -680,10 +682,16 @@
"""
def expand(self, pcoll):
+ if sys.version_info >= (3,):
+ KeyedT = typing.Tuple[int, T]
+ else:
+ KeyedT = typing.Tuple[long, T] # pylint: disable=long-builtin
return (pcoll
| 'AddRandomKeys' >> Map(lambda t: (random.getrandbits(32), t))
+ .with_input_types(T).with_output_types(KeyedT)
| ReshufflePerKey()
- | 'RemoveRandomKeys' >> Map(lambda t: t[1]))
+ | 'RemoveRandomKeys' >> Map(lambda t: t[1])
+ .with_input_types(KeyedT).with_output_types(T))
def to_runner_api_parameter(self, unused_context):
return common_urns.composites.RESHUFFLE.urn, None
diff --git a/sdks/python/apache_beam/transforms/util_test.py b/sdks/python/apache_beam/transforms/util_test.py
index 74829e5..58cf243 100644
--- a/sdks/python/apache_beam/transforms/util_test.py
+++ b/sdks/python/apache_beam/transforms/util_test.py
@@ -100,7 +100,7 @@
with TestPipeline() as p:
res = (
p
- | beam.Create(range(47))
+ | beam.Create(range(47), reshuffle=False)
| beam.Map(lambda t: window.TimestampedValue(t, t))
| beam.WindowInto(window.FixedWindows(30))
| util.BatchElements(
@@ -351,7 +351,8 @@
after_gbk = (pipeline
| beam.Create(data)
- | beam.GroupByKey())
+ | beam.GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
assert_that(after_gbk, equal_to(expected_result), label='after_gbk')
after_reshuffle = after_gbk | beam.Reshuffle()
assert_that(after_reshuffle, equal_to(expected_result),
@@ -435,7 +436,8 @@
before_reshuffle = (pipeline
| beam.Create(data)
| beam.WindowInto(GlobalWindows())
- | beam.GroupByKey())
+ | beam.GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
assert_that(before_reshuffle, equal_to(expected_data),
label='before_reshuffle')
after_reshuffle = before_reshuffle | beam.Reshuffle()
@@ -452,7 +454,8 @@
| beam.Create(data)
| beam.WindowInto(SlidingWindows(
size=window_size, period=1))
- | beam.GroupByKey())
+ | beam.GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
assert_that(before_reshuffle, equal_to(expected_data),
label='before_reshuffle')
after_reshuffle = before_reshuffle | beam.Reshuffle()
@@ -471,7 +474,8 @@
before_reshuffle = (pipeline
| beam.Create(data)
| beam.WindowInto(GlobalWindows())
- | beam.GroupByKey())
+ | beam.GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
assert_that(before_reshuffle, equal_to(expected_data),
label='before_reshuffle')
after_reshuffle = before_reshuffle | beam.Reshuffle()
diff --git a/sdks/python/apache_beam/transforms/window_test.py b/sdks/python/apache_beam/transforms/window_test.py
index a405948..30430cc 100644
--- a/sdks/python/apache_beam/transforms/window_test.py
+++ b/sdks/python/apache_beam/transforms/window_test.py
@@ -196,6 +196,7 @@
result = (pcoll
| 'w' >> WindowInto(SlidingWindows(period=2, size=4))
| GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs)))
| reify_windows)
expected = [('key @ [-2.0, 2.0)', [1]),
('key @ [0.0, 4.0)', [1, 2, 3]),
@@ -222,7 +223,8 @@
| Map(lambda x_t: TimestampedValue(x_t[0], x_t[1]))
| 'w' >> WindowInto(FixedWindows(5))
| Map(lambda v: ('key', v))
- | GroupByKey())
+ | GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
assert_that(result, equal_to([('key', [0, 1, 2, 3, 4]),
('key', [5, 6, 7, 8, 9])]))
@@ -237,7 +239,8 @@
| 'rewindow' >> WindowInto(FixedWindows(5))
| 'rewindow2' >> WindowInto(FixedWindows(5))
| Map(lambda v: ('key', v))
- | GroupByKey())
+ | GroupByKey()
+ | beam.MapTuple(lambda k, vs: (k, sorted(vs))))
assert_that(result, equal_to([('key', sorted([0, 1, 2, 3, 4] * 3)),
('key', sorted([5, 6, 7, 8, 9] * 3))]))