blob: a3f91c068f2c08d8167522c1a6482765cebaa2df [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Tests for apache_beam.runners.interactive.pipeline_instrument."""
# pytype: skip-file
import tempfile
import unittest
import apache_beam as beam
from apache_beam import coders
from apache_beam.pipeline import PipelineVisitor
from apache_beam.runners.interactive import cache_manager as cache
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import pipeline_instrument as instr
from apache_beam.runners.interactive import interactive_runner
from apache_beam.runners.interactive.caching.streaming_cache import StreamingCache
from apache_beam.runners.interactive.testing.pipeline_assertion import assert_pipeline_equal
from apache_beam.runners.interactive.testing.pipeline_assertion import assert_pipeline_proto_contain_top_level_transform
from apache_beam.runners.interactive.testing.pipeline_assertion import assert_pipeline_proto_equal
from apache_beam.runners.interactive.testing.pipeline_assertion import \
assert_pipeline_proto_not_contain_top_level_transform
from apache_beam.runners.interactive.testing.test_cache_manager import InMemoryCache
from apache_beam.testing.test_stream import TestStream
class PipelineInstrumentTest(unittest.TestCase):
def setUp(self):
ie.new_env()
def cache_key_of(self, name, pcoll):
return repr(
instr.CacheKey(
name,
str(id(pcoll)),
str(id(pcoll.producer)),
str(id(pcoll.pipeline))))
def test_pcolls_to_pcoll_id(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(InMemoryCache(), p)
# pylint: disable=range-builtin-not-iterating
init_pcoll = p | 'Init Create' >> beam.Impulse()
_, ctx = p.to_runner_api(return_context=True)
self.assertEqual(
instr.pcolls_to_pcoll_id(p, ctx),
{str(init_pcoll): 'ref_PCollection_PCollection_1'})
def test_cacheable_key_without_version_map(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(InMemoryCache(), p)
# pylint: disable=range-builtin-not-iterating
init_pcoll = p | 'Init Create' >> beam.Create(range(10))
_, ctx = p.to_runner_api(return_context=True)
self.assertEqual(
instr.cacheable_key(init_pcoll, instr.pcolls_to_pcoll_id(p, ctx)),
str(id(init_pcoll)) + '_ref_PCollection_PCollection_8')
def test_cacheable_key_with_version_map(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(InMemoryCache(), p)
# pylint: disable=range-builtin-not-iterating
init_pcoll = p | 'Init Create' >> beam.Create(range(10))
# It's normal that when executing, the pipeline object is a different
# but equivalent instance from what user has built. The pipeline instrument
# should be able to identify if the original instance has changed in an
# interactive env while mutating the other instance for execution. The
# version map can be used to figure out what the PCollection instances are
# in the original instance and if the evaluation has changed since last
# execution.
p2 = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(InMemoryCache(), p2)
# pylint: disable=range-builtin-not-iterating
init_pcoll_2 = p2 | 'Init Create' >> beam.Create(range(10))
_, ctx = p2.to_runner_api(return_context=True)
# The cacheable_key should use id(init_pcoll) as prefix even when
# init_pcoll_2 is supplied as long as the version map is given.
self.assertEqual(
instr.cacheable_key(
init_pcoll_2,
instr.pcolls_to_pcoll_id(p2, ctx),
{'ref_PCollection_PCollection_8': str(id(init_pcoll))}),
str(id(init_pcoll)) + '_ref_PCollection_PCollection_8')
def test_cache_key(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(InMemoryCache(), p)
# pylint: disable=range-builtin-not-iterating
init_pcoll = p | 'Init Create' >> beam.Create(range(10))
squares = init_pcoll | 'Square' >> beam.Map(lambda x: x * x)
cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)
# Watch the local variables, i.e., the Beam pipeline defined.
ib.watch(locals())
pipeline_instrument = instr.build_pipeline_instrument(p)
self.assertEqual(
pipeline_instrument.cache_key(init_pcoll),
self.cache_key_of('init_pcoll', init_pcoll))
self.assertEqual(
pipeline_instrument.cache_key(squares),
self.cache_key_of('squares', squares))
self.assertEqual(
pipeline_instrument.cache_key(cubes), self.cache_key_of('cubes', cubes))
def test_cacheables(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(InMemoryCache(), p)
# pylint: disable=range-builtin-not-iterating
init_pcoll = p | 'Init Create' >> beam.Create(range(10))
squares = init_pcoll | 'Square' >> beam.Map(lambda x: x * x)
cubes = init_pcoll | 'Cube' >> beam.Map(lambda x: x**3)
ib.watch(locals())
pipeline_instrument = instr.build_pipeline_instrument(p)
# TODO(BEAM-7760): The PipelineInstrument cacheables maintains a global list
# of cacheable PCollections across all pipelines. Here we take the subset of
# cacheables that only pertain to this test's pipeline.
cacheables = {
k: c
for k,
c in pipeline_instrument.cacheables.items() if c.pcoll.pipeline is p
}
self.assertEqual(
cacheables,
{
pipeline_instrument._cacheable_key(init_pcoll): instr.Cacheable(
var='init_pcoll',
version=str(id(init_pcoll)),
pcoll_id='ref_PCollection_PCollection_8',
producer_version=str(id(init_pcoll.producer)),
pcoll=init_pcoll),
pipeline_instrument._cacheable_key(squares): instr.Cacheable(
var='squares',
version=str(id(squares)),
pcoll_id='ref_PCollection_PCollection_9',
producer_version=str(id(squares.producer)),
pcoll=squares),
pipeline_instrument._cacheable_key(cubes): instr.Cacheable(
var='cubes',
version=str(id(cubes)),
pcoll_id='ref_PCollection_PCollection_10',
producer_version=str(id(cubes.producer)),
pcoll=cubes)
})
def test_has_unbounded_source(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(InMemoryCache(), p)
_ = p | 'ReadUnboundedSource' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
self.assertTrue(instr.has_unbounded_sources(p))
def test_not_has_unbounded_source(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(InMemoryCache(), p)
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(b'test')
_ = p | 'ReadBoundedSource' >> beam.io.ReadFromText(f.name)
self.assertFalse(instr.has_unbounded_sources(p))
def test_background_caching_pipeline_proto(self):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(StreamingCache(cache_dir=None), p)
# Test that the two ReadFromPubSub are correctly cut out.
a = p | 'ReadUnboundedSourceA' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
b = p | 'ReadUnboundedSourceB' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
# Add some extra PTransform afterwards to make sure that only the unbounded
# sources remain.
c = (a, b) | beam.Flatten()
_ = c | beam.Map(lambda x: x)
ib.watch(locals())
instrumenter = instr.build_pipeline_instrument(p)
actual_pipeline = instrumenter.background_caching_pipeline_proto()
# Now recreate the expected pipeline, which should only have the unbounded
# sources.
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(StreamingCache(cache_dir=None), p)
a = p | 'ReadUnboundedSourceA' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
_ = (
a
| 'reify a' >> beam.Map(lambda _: _)
| 'a' >> cache.WriteCache(ie.current_env().get_cache_manager(p), ''))
b = p | 'ReadUnboundedSourceB' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
_ = (
b
| 'reify b' >> beam.Map(lambda _: _)
| 'b' >> cache.WriteCache(ie.current_env().get_cache_manager(p), ''))
expected_pipeline = p.to_runner_api(return_context=False)
assert_pipeline_proto_equal(self, expected_pipeline, actual_pipeline)
def _example_pipeline(self, watch=True, bounded=True):
p = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(InMemoryCache(), p)
# pylint: disable=range-builtin-not-iterating
if bounded:
source = beam.Create(range(10))
else:
source = beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
init_pcoll = p | 'Init Source' >> source
second_pcoll = init_pcoll | 'Second' >> beam.Map(lambda x: x * x)
if watch:
ib.watch(locals())
return (p, init_pcoll, second_pcoll)
def _mock_write_cache(self, pipeline, values, cache_key):
"""Cache the PCollection where cache.WriteCache would write to."""
labels = ['full', cache_key]
# Usually, the pcoder will be inferred from `pcoll.element_type`
pcoder = coders.registry.get_coder(object)
cache_manager = ie.current_env().get_cache_manager(pipeline)
cache_manager.save_pcoder(pcoder, *labels)
cache_manager.write(values, *labels)
def test_instrument_example_pipeline_to_write_cache(self):
# Original instance defined by user code has all variables handlers.
p_origin, init_pcoll, second_pcoll = self._example_pipeline()
# Copied instance when execution has no user defined variables.
p_copy, _, _ = self._example_pipeline(False)
# Instrument the copied pipeline.
pipeline_instrument = instr.build_pipeline_instrument(p_copy)
# Manually instrument original pipeline with expected pipeline transforms.
init_pcoll_cache_key = pipeline_instrument.cache_key(init_pcoll)
_ = (
init_pcoll
| 'reify init' >> beam.Map(lambda _: _)
| '_WriteCache_' + init_pcoll_cache_key >> cache.WriteCache(
ie.current_env().get_cache_manager(p_origin), init_pcoll_cache_key))
second_pcoll_cache_key = pipeline_instrument.cache_key(second_pcoll)
_ = (
second_pcoll
| 'reify second' >> beam.Map(lambda _: _)
| '_WriteCache_' + second_pcoll_cache_key >> cache.WriteCache(
ie.current_env().get_cache_manager(p_origin),
second_pcoll_cache_key))
# The 2 pipelines should be the same now.
assert_pipeline_equal(self, p_copy, p_origin)
def test_instrument_example_pipeline_to_read_cache(self):
p_origin, init_pcoll, second_pcoll = self._example_pipeline()
p_copy, _, _ = self._example_pipeline(False)
# Mock as if cacheable PCollections are cached.
init_pcoll_cache_key = self.cache_key_of('init_pcoll', init_pcoll)
self._mock_write_cache(p_origin, [b'1', b'2', b'3'], init_pcoll_cache_key)
second_pcoll_cache_key = self.cache_key_of('second_pcoll', second_pcoll)
self._mock_write_cache(p_origin, [b'1', b'4', b'9'], second_pcoll_cache_key)
# Mark the completeness of PCollections from the original(user) pipeline.
ie.current_env().mark_pcollection_computed((init_pcoll, second_pcoll))
ie.current_env().add_derived_pipeline(p_origin, p_copy)
instr.build_pipeline_instrument(p_copy)
cached_init_pcoll = (
p_origin
| '_ReadCache_' + init_pcoll_cache_key >> cache.ReadCache(
ie.current_env().get_cache_manager(p_origin), init_pcoll_cache_key)
| 'unreify' >> beam.Map(lambda _: _))
# second_pcoll is never used as input and there is no need to read cache.
class TestReadCacheWireVisitor(PipelineVisitor):
"""Replace init_pcoll with cached_init_pcoll for all occuring inputs."""
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
if transform_node.inputs:
main_inputs = dict(transform_node.main_inputs)
for tag in main_inputs.keys():
if main_inputs[tag] == init_pcoll:
main_inputs[tag] = cached_init_pcoll
transform_node.main_inputs = main_inputs
v = TestReadCacheWireVisitor()
p_origin.visit(v)
assert_pipeline_equal(self, p_origin, p_copy)
def test_find_out_correct_user_pipeline(self):
# This is the user pipeline instance we care in the watched scope.
user_pipeline, _, _ = self._example_pipeline()
# This is a new runner pipeline instance with the same pipeline graph to
# what the user_pipeline represents.
runner_pipeline = beam.pipeline.Pipeline.from_runner_api(
user_pipeline.to_runner_api(), user_pipeline.runner, options=None)
ie.current_env().add_derived_pipeline(user_pipeline, runner_pipeline)
# This is a totally irrelevant user pipeline in the watched scope.
irrelevant_user_pipeline = beam.Pipeline(
interactive_runner.InteractiveRunner())
ib.watch({'irrelevant_user_pipeline': irrelevant_user_pipeline})
# Build instrument from the runner pipeline.
pipeline_instrument = instr.build_pipeline_instrument(runner_pipeline)
self.assertIs(pipeline_instrument.user_pipeline, user_pipeline)
def test_instrument_example_unbounded_pipeline_to_read_cache(self):
"""Tests that the instrumenter works for a single unbounded source.
"""
# Create the pipeline that will be instrumented.
p_original = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(
StreamingCache(cache_dir=None), p_original)
source_1 = p_original | 'source1' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
# pylint: disable=possibly-unused-variable
pcoll_1 = source_1 | 'square1' >> beam.Map(lambda x: x * x)
# Mock as if cacheable PCollections are cached.
ib.watch(locals())
for name, pcoll in locals().items():
if not isinstance(pcoll, beam.pvalue.PCollection):
continue
cache_key = self.cache_key_of(name, pcoll)
self._mock_write_cache(p_original, [], cache_key)
# Instrument the original pipeline to create the pipeline the user will see.
instrumenter = instr.build_pipeline_instrument(p_original)
actual_pipeline = beam.Pipeline.from_runner_api(
proto=instrumenter.instrumented_pipeline_proto(),
runner=interactive_runner.InteractiveRunner(),
options=None)
# Now, build the expected pipeline which replaces the unbounded source with
# a TestStream.
source_1_cache_key = self.cache_key_of('source_1', source_1)
p_expected = beam.Pipeline()
test_stream = (p_expected | TestStream(output_tags=[source_1_cache_key]))
# pylint: disable=expression-not-assigned
test_stream[source_1_cache_key] | 'square1' >> beam.Map(lambda x: x * x)
# Test that the TestStream is outputting to the correct PCollection.
class TestStreamVisitor(PipelineVisitor):
def __init__(self):
self.output_tags = set()
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
transform = transform_node.transform
if isinstance(transform, TestStream):
self.output_tags = transform.output_tags
v = TestStreamVisitor()
actual_pipeline.visit(v)
expected_output_tags = set([source_1_cache_key])
actual_output_tags = v.output_tags
self.assertSetEqual(expected_output_tags, actual_output_tags)
# Test that the pipeline is as expected.
assert_pipeline_proto_equal(
self,
p_expected.to_runner_api(),
instrumenter.instrumented_pipeline_proto())
def test_able_to_cache_intermediate_unbounded_source_pcollection(self):
"""Tests being able to cache an intermediate source PCollection.
In the following pipeline, the source doesn't have a reference and so is
not automatically cached in the watch() command. This tests that this case
is taken care of.
"""
# Create the pipeline that will be instrumented.
from apache_beam.options.pipeline_options import StandardOptions
options = StandardOptions(streaming=True)
streaming_cache_manager = StreamingCache(cache_dir=None)
p_original = beam.Pipeline(interactive_runner.InteractiveRunner(), options)
ie.current_env().set_cache_manager(streaming_cache_manager, p_original)
# pylint: disable=possibly-unused-variable
source_1 = (
p_original
| 'source1' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
| beam.Map(lambda e: e))
# Watch but do not cache the PCollections.
ib.watch(locals())
# Make sure that sources without a user reference are still cached.
instr.watch_sources(p_original)
intermediate_source_pcoll = None
for watching in ie.current_env().watching():
watching = list(watching)
for var, watchable in watching:
if 'synthetic' in var:
intermediate_source_pcoll = watchable
break
# Instrument the original pipeline to create the pipeline the user will see.
p_copy = beam.Pipeline.from_runner_api(
p_original.to_runner_api(),
runner=interactive_runner.InteractiveRunner(),
options=options)
instrumenter = instr.build_pipeline_instrument(p_copy)
actual_pipeline = beam.Pipeline.from_runner_api(
proto=instrumenter.instrumented_pipeline_proto(),
runner=interactive_runner.InteractiveRunner(),
options=options)
# Now, build the expected pipeline which replaces the unbounded source with
# a TestStream.
intermediate_source_pcoll_cache_key = \
self.cache_key_of('synthetic_var_' + str(id(intermediate_source_pcoll)),
intermediate_source_pcoll)
p_expected = beam.Pipeline()
ie.current_env().set_cache_manager(streaming_cache_manager, p_expected)
test_stream = (
p_expected
| TestStream(output_tags=[intermediate_source_pcoll_cache_key]))
# pylint: disable=expression-not-assigned
(
test_stream[intermediate_source_pcoll_cache_key]
| 'square1' >> beam.Map(lambda e: e)
| 'reify' >> beam.Map(lambda _: _)
| cache.WriteCache(
ie.current_env().get_cache_manager(p_expected), 'unused'))
# Test that the TestStream is outputting to the correct PCollection.
class TestStreamVisitor(PipelineVisitor):
def __init__(self):
self.output_tags = set()
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
transform = transform_node.transform
if isinstance(transform, TestStream):
self.output_tags = transform.output_tags
v = TestStreamVisitor()
actual_pipeline.visit(v)
expected_output_tags = set([intermediate_source_pcoll_cache_key])
actual_output_tags = v.output_tags
self.assertSetEqual(expected_output_tags, actual_output_tags)
# Test that the pipeline is as expected.
assert_pipeline_proto_equal(
self,
p_expected.to_runner_api(),
instrumenter.instrumented_pipeline_proto())
def test_instrument_mixed_streaming_batch(self):
"""Tests caching for both batch and streaming sources in the same pipeline.
This ensures that cached bounded and unbounded sources are read from the
TestStream.
"""
# Create the pipeline that will be instrumented.
from apache_beam.options.pipeline_options import StandardOptions
options = StandardOptions(streaming=True)
p_original = beam.Pipeline(interactive_runner.InteractiveRunner(), options)
streaming_cache_manager = StreamingCache(cache_dir=None)
ie.current_env().set_cache_manager(streaming_cache_manager, p_original)
source_1 = p_original | 'source1' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
source_2 = p_original | 'source2' >> beam.Create([1, 2, 3, 4, 5])
# pylint: disable=possibly-unused-variable
pcoll_1 = ((source_1, source_2)
| beam.Flatten()
| 'square1' >> beam.Map(lambda x: x * x))
# Watch but do not cache the PCollections.
ib.watch(locals())
self._mock_write_cache(
p_original, [], self.cache_key_of('source_2', source_2))
ie.current_env().mark_pcollection_computed([source_2])
# Instrument the original pipeline to create the pipeline the user will see.
p_copy = beam.Pipeline.from_runner_api(
p_original.to_runner_api(),
runner=interactive_runner.InteractiveRunner(),
options=options)
ie.current_env().add_derived_pipeline(p_original, p_copy)
instrumenter = instr.build_pipeline_instrument(p_copy)
actual_pipeline = beam.Pipeline.from_runner_api(
proto=instrumenter.instrumented_pipeline_proto(),
runner=interactive_runner.InteractiveRunner(),
options=options)
# Now, build the expected pipeline which replaces the unbounded source with
# a TestStream.
source_1_cache_key = self.cache_key_of('source_1', source_1)
source_2_cache_key = self.cache_key_of('source_2', source_2)
p_expected = beam.Pipeline()
ie.current_env().set_cache_manager(streaming_cache_manager, p_expected)
test_stream = (
p_expected
| TestStream(output_tags=[source_1_cache_key, source_2_cache_key]))
# pylint: disable=expression-not-assigned
((
test_stream[self.cache_key_of('source_1', source_1)],
test_stream[self.cache_key_of('source_2', source_2)])
| beam.Flatten()
| 'square1' >> beam.Map(lambda x: x * x)
| 'reify' >> beam.Map(lambda _: _)
| cache.WriteCache(
ie.current_env().get_cache_manager(p_expected), 'unused'))
# Test that the TestStream is outputting to the correct PCollection.
class TestStreamVisitor(PipelineVisitor):
def __init__(self):
self.output_tags = set()
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
transform = transform_node.transform
if isinstance(transform, TestStream):
self.output_tags = transform.output_tags
v = TestStreamVisitor()
actual_pipeline.visit(v)
expected_output_tags = set([source_1_cache_key, source_2_cache_key])
actual_output_tags = v.output_tags
self.assertSetEqual(expected_output_tags, actual_output_tags)
# Test that the pipeline is as expected.
assert_pipeline_proto_equal(
self,
p_expected.to_runner_api(),
instrumenter.instrumented_pipeline_proto())
def test_instrument_example_unbounded_pipeline_direct_from_source(self):
"""Tests that the it caches PCollections from a source.
"""
# Create the pipeline that will be instrumented.
from apache_beam.options.pipeline_options import StandardOptions
options = StandardOptions(streaming=True)
p_original = beam.Pipeline(interactive_runner.InteractiveRunner(), options)
ie.current_env().set_cache_manager(
StreamingCache(cache_dir=None), p_original)
source_1 = p_original | 'source1' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
# pylint: disable=possibly-unused-variable
# Watch but do not cache the PCollections.
ib.watch(locals())
# Instrument the original pipeline to create the pipeline the user will see.
p_copy = beam.Pipeline.from_runner_api(
p_original.to_runner_api(),
runner=interactive_runner.InteractiveRunner(),
options=options)
instrumenter = instr.build_pipeline_instrument(p_copy)
actual_pipeline = beam.Pipeline.from_runner_api(
proto=instrumenter.instrumented_pipeline_proto(),
runner=interactive_runner.InteractiveRunner(),
options=options)
# Now, build the expected pipeline which replaces the unbounded source with
# a TestStream.
source_1_cache_key = self.cache_key_of('source_1', source_1)
p_expected = beam.Pipeline()
# pylint: disable=unused-variable
test_stream = (
p_expected
| TestStream(output_tags=[self.cache_key_of('source_1', source_1)]))
# Test that the TestStream is outputting to the correct PCollection.
class TestStreamVisitor(PipelineVisitor):
def __init__(self):
self.output_tags = set()
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
transform = transform_node.transform
if isinstance(transform, TestStream):
self.output_tags = transform.output_tags
v = TestStreamVisitor()
actual_pipeline.visit(v)
expected_output_tags = set([source_1_cache_key])
actual_output_tags = v.output_tags
self.assertSetEqual(expected_output_tags, actual_output_tags)
# Test that the pipeline is as expected.
assert_pipeline_proto_equal(
self,
p_expected.to_runner_api(),
instrumenter.instrumented_pipeline_proto())
def test_instrument_example_unbounded_pipeline_to_read_cache_not_cached(self):
"""Tests that the instrumenter works when the PCollection is not cached.
"""
# Create the pipeline that will be instrumented.
from apache_beam.options.pipeline_options import StandardOptions
options = StandardOptions(streaming=True)
p_original = beam.Pipeline(interactive_runner.InteractiveRunner(), options)
ie.current_env().set_cache_manager(
StreamingCache(cache_dir=None), p_original)
source_1 = p_original | 'source1' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
# pylint: disable=possibly-unused-variable
pcoll_1 = source_1 | 'square1' >> beam.Map(lambda x: x * x)
# Watch but do not cache the PCollections.
ib.watch(locals())
# Instrument the original pipeline to create the pipeline the user will see.
p_copy = beam.Pipeline.from_runner_api(
p_original.to_runner_api(),
runner=interactive_runner.InteractiveRunner(),
options=options)
instrumenter = instr.build_pipeline_instrument(p_copy)
actual_pipeline = beam.Pipeline.from_runner_api(
proto=instrumenter.instrumented_pipeline_proto(),
runner=interactive_runner.InteractiveRunner(),
options=options)
# Now, build the expected pipeline which replaces the unbounded source with
# a TestStream.
source_1_cache_key = self.cache_key_of('source_1', source_1)
p_expected = beam.Pipeline()
ie.current_env().set_cache_manager(
StreamingCache(cache_dir=None), p_expected)
test_stream = (p_expected | TestStream(output_tags=[source_1_cache_key]))
# pylint: disable=expression-not-assigned
(
test_stream[source_1_cache_key]
| 'square1' >> beam.Map(lambda x: x * x)
| 'reify' >> beam.Map(lambda _: _)
| cache.WriteCache(
ie.current_env().get_cache_manager(p_expected), 'unused'))
# Test that the TestStream is outputting to the correct PCollection.
class TestStreamVisitor(PipelineVisitor):
def __init__(self):
self.output_tags = set()
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
transform = transform_node.transform
if isinstance(transform, TestStream):
self.output_tags = transform.output_tags
v = TestStreamVisitor()
actual_pipeline.visit(v)
expected_output_tags = set([source_1_cache_key])
actual_output_tags = v.output_tags
self.assertSetEqual(expected_output_tags, actual_output_tags)
# Test that the pipeline is as expected.
assert_pipeline_proto_equal(
self,
p_expected.to_runner_api(),
instrumenter.instrumented_pipeline_proto())
def test_instrument_example_unbounded_pipeline_to_multiple_read_cache(self):
"""Tests that the instrumenter works for multiple unbounded sources.
"""
# Create the pipeline that will be instrumented.
p_original = beam.Pipeline(interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(
StreamingCache(cache_dir=None), p_original)
source_1 = p_original | 'source1' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
source_2 = p_original | 'source2' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
# pylint: disable=possibly-unused-variable
pcoll_1 = source_1 | 'square1' >> beam.Map(lambda x: x * x)
# pylint: disable=possibly-unused-variable
pcoll_2 = source_2 | 'square2' >> beam.Map(lambda x: x * x)
# Mock as if cacheable PCollections are cached.
ib.watch(locals())
for name, pcoll in locals().items():
if not isinstance(pcoll, beam.pvalue.PCollection):
continue
cache_key = self.cache_key_of(name, pcoll)
self._mock_write_cache(p_original, [], cache_key)
# Instrument the original pipeline to create the pipeline the user will see.
instrumenter = instr.build_pipeline_instrument(p_original)
actual_pipeline = beam.Pipeline.from_runner_api(
proto=instrumenter.instrumented_pipeline_proto(),
runner=interactive_runner.InteractiveRunner(),
options=None)
# Now, build the expected pipeline which replaces the unbounded source with
# a TestStream.
source_1_cache_key = self.cache_key_of('source_1', source_1)
source_2_cache_key = self.cache_key_of('source_2', source_2)
p_expected = beam.Pipeline()
test_stream = (
p_expected
| TestStream(
output_tags=[
self.cache_key_of('source_1', source_1),
self.cache_key_of('source_2', source_2)
]))
# pylint: disable=expression-not-assigned
test_stream[source_1_cache_key] | 'square1' >> beam.Map(lambda x: x * x)
# pylint: disable=expression-not-assigned
test_stream[source_2_cache_key] | 'square2' >> beam.Map(lambda x: x * x)
# Test that the TestStream is outputting to the correct PCollection.
class TestStreamVisitor(PipelineVisitor):
def __init__(self):
self.output_tags = set()
def enter_composite_transform(self, transform_node):
self.visit_transform(transform_node)
def visit_transform(self, transform_node):
transform = transform_node.transform
if isinstance(transform, TestStream):
self.output_tags = transform.output_tags
v = TestStreamVisitor()
actual_pipeline.visit(v)
expected_output_tags = set([source_1_cache_key, source_2_cache_key])
actual_output_tags = v.output_tags
self.assertSetEqual(expected_output_tags, actual_output_tags)
# Test that the pipeline is as expected.
assert_pipeline_proto_equal(
self,
p_expected.to_runner_api(),
instrumenter.instrumented_pipeline_proto())
def test_pipeline_pruned_when_input_pcoll_is_cached(self):
user_pipeline, init_pcoll, _ = self._example_pipeline()
runner_pipeline = beam.Pipeline.from_runner_api(
user_pipeline.to_runner_api(), user_pipeline.runner, None)
ie.current_env().add_derived_pipeline(user_pipeline, runner_pipeline)
# Mock as if init_pcoll is cached.
init_pcoll_cache_key = self.cache_key_of('init_pcoll', init_pcoll)
self._mock_write_cache(
user_pipeline, [b'1', b'2', b'3'], init_pcoll_cache_key)
ie.current_env().mark_pcollection_computed([init_pcoll])
# Build an instrument from the runner pipeline.
pipeline_instrument = instr.build_pipeline_instrument(runner_pipeline)
pruned_proto = pipeline_instrument.instrumented_pipeline_proto()
# Skip the prune step for comparison, it should contain the sub-graph that
# produces init_pcoll but not useful anymore.
full_proto = pipeline_instrument._pipeline.to_runner_api()
self.assertEqual(
len(
pruned_proto.components.transforms[
'ref_AppliedPTransform_AppliedPTransform_1'].subtransforms),
5)
assert_pipeline_proto_not_contain_top_level_transform(
self, pruned_proto, 'Init Source')
self.assertEqual(
len(
full_proto.components.transforms[
'ref_AppliedPTransform_AppliedPTransform_1'].subtransforms),
6)
assert_pipeline_proto_contain_top_level_transform(
self, full_proto, 'Init-Source')
def test_side_effect_pcoll_is_included(self):
pipeline_with_side_effect = beam.Pipeline(
interactive_runner.InteractiveRunner())
ie.current_env().set_cache_manager(
InMemoryCache(), pipeline_with_side_effect)
# Deliberately not assign the result to a variable to make it a
# "side effect" transform. Note we never watch anything from
# the pipeline defined locally either.
# pylint: disable=range-builtin-not-iterating,expression-not-assigned
pipeline_with_side_effect | 'Init Create' >> beam.Create(range(10))
pipeline_instrument = instr.build_pipeline_instrument(
pipeline_with_side_effect)
self.assertTrue(pipeline_instrument._extended_targets)
if __name__ == '__main__':
unittest.main()