blob: 6354184625cb8827d880f0b0f6edd78d7b2cf16d [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import time
import unittest
from unittest.mock import MagicMock
import apache_beam as beam
from apache_beam import coders
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.portability.api.beam_interactive_api_pb2 import TestStreamFileRecord
from apache_beam.runners.interactive import background_caching_job as bcj
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import pipeline_instrument as pi
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.interactive.options.capture_limiters import Limiter
from apache_beam.runners.interactive.recording_manager import ElementStream
from apache_beam.runners.interactive.recording_manager import Recording
from apache_beam.runners.interactive.recording_manager import RecordingManager
from apache_beam.runners.interactive.testing.test_cache_manager import FileRecordsBuilder
from apache_beam.runners.interactive.testing.test_cache_manager import InMemoryCache
from apache_beam.runners.runner import PipelineState
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.test_stream import WindowedValueHolder
from apache_beam.transforms.window import GlobalWindow
from apache_beam.utils.timestamp import MIN_TIMESTAMP
from apache_beam.utils.windowed_value import WindowedValue
class MockPipelineResult(beam.runners.runner.PipelineResult):
"""Mock class for controlling a PipelineResult."""
def __init__(self):
self._state = PipelineState.RUNNING
def wait_until_finish(self):
pass
def set_state(self, state):
self._state = state
@property
def state(self):
return self._state
def cancel(self):
self._state = PipelineState.CANCELLED
class ElementStreamTest(unittest.TestCase):
def setUp(self):
self.cache = InMemoryCache()
self.p = beam.Pipeline()
self.pcoll = self.p | beam.Create([])
self.cache_key = str(pi.CacheKey('pcoll', '', '', ''))
# Create a MockPipelineResult to control the state of a fake run of the
# pipeline.
self.mock_result = MockPipelineResult()
ie.current_env().track_user_pipelines()
ie.current_env().set_pipeline_result(self.p, self.mock_result)
ie.current_env().set_cache_manager(self.cache, self.p)
def test_read(self):
"""Test reading and if a stream is done no more elements are returned."""
self.mock_result.set_state(PipelineState.DONE)
self.cache.write(['expected'], 'full', self.cache_key)
self.cache.save_pcoder(None, 'full', self.cache_key)
stream = ElementStream(
self.pcoll, '', self.cache_key, max_n=1, max_duration_secs=1)
self.assertFalse(stream.is_done())
self.assertEqual(list(stream.read())[0], 'expected')
self.assertTrue(stream.is_done())
def test_done_if_terminated(self):
"""Test that terminating the job sets the stream as done."""
self.cache.write(['expected'], 'full', self.cache_key)
self.cache.save_pcoder(None, 'full', self.cache_key)
stream = ElementStream(
self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=10)
self.assertFalse(stream.is_done())
self.assertEqual(list(stream.read(tail=False))[0], 'expected')
# The limiters were not reached, so the stream is not done yet.
self.assertFalse(stream.is_done())
self.mock_result.set_state(PipelineState.DONE)
self.assertEqual(list(stream.read(tail=False))[0], 'expected')
# The underlying pipeline is terminated, so the stream won't yield new
# elements.
self.assertTrue(stream.is_done())
def test_read_n(self):
"""Test that the stream only reads 'n' elements."""
self.mock_result.set_state(PipelineState.DONE)
self.cache.write(list(range(5)), 'full', self.cache_key)
self.cache.save_pcoder(None, 'full', self.cache_key)
stream = ElementStream(
self.pcoll, '', self.cache_key, max_n=1, max_duration_secs=1)
self.assertEqual(list(stream.read()), [0])
self.assertTrue(stream.is_done())
stream = ElementStream(
self.pcoll, '', self.cache_key, max_n=2, max_duration_secs=1)
self.assertEqual(list(stream.read()), [0, 1])
self.assertTrue(stream.is_done())
stream = ElementStream(
self.pcoll, '', self.cache_key, max_n=5, max_duration_secs=1)
self.assertEqual(list(stream.read()), list(range(5)))
self.assertTrue(stream.is_done())
# Test that if the user asks for more than in the cache it still returns.
stream = ElementStream(
self.pcoll, '', self.cache_key, max_n=10, max_duration_secs=1)
self.assertEqual(list(stream.read()), list(range(5)))
self.assertTrue(stream.is_done())
def test_read_duration(self):
"""Test that the stream only reads a 'duration' of elements."""
def as_windowed_value(element):
return WindowedValueHolder(WindowedValue(element, 0, []))
values = (FileRecordsBuilder(tag=self.cache_key)
.advance_processing_time(1)
.add_element(element=as_windowed_value(0), event_time_secs=0)
.advance_processing_time(1)
.add_element(element=as_windowed_value(1), event_time_secs=1)
.advance_processing_time(1)
.add_element(element=as_windowed_value(2), event_time_secs=3)
.advance_processing_time(1)
.add_element(element=as_windowed_value(3), event_time_secs=4)
.advance_processing_time(1)
.add_element(element=as_windowed_value(4), event_time_secs=5)
.build()) # yapf: disable
values = [
v.recorded_event for v in values if isinstance(v, TestStreamFileRecord)
]
self.mock_result.set_state(PipelineState.DONE)
self.cache.write(values, 'full', self.cache_key)
self.cache.save_pcoder(coders.FastPrimitivesCoder(), 'full', self.cache_key)
# The following tests a progression of reading different durations from the
# cache.
stream = ElementStream(
self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=1)
self.assertSequenceEqual([e.value for e in stream.read()], [0])
stream = ElementStream(
self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=2)
self.assertSequenceEqual([e.value for e in stream.read()], [0, 1])
stream = ElementStream(
self.pcoll, '', self.cache_key, max_n=100, max_duration_secs=10)
self.assertSequenceEqual([e.value for e in stream.read()], [0, 1, 2, 3, 4])
class RecordingTest(unittest.TestCase):
def test_computed(self):
"""Tests that a PCollection is marked as computed only in a complete state.
Because the background caching job is now long-lived, repeated runs of a
PipelineFragment may yield different results for the same PCollection.
"""
p = beam.Pipeline(InteractiveRunner())
elems = p | beam.Create([0, 1, 2])
ib.watch(locals())
# Create a MockPipelineResult to control the state of a fake run of the
# pipeline.
mock_result = MockPipelineResult()
ie.current_env().track_user_pipelines()
ie.current_env().set_pipeline_result(p, mock_result)
# Create a mock BackgroundCachingJob that will control whether to set the
# PCollections as computed or not.
bcj_mock_result = MockPipelineResult()
background_caching_job = bcj.BackgroundCachingJob(bcj_mock_result, [])
# Create a recording.
recording = Recording(
p, [elems],
mock_result,
pi.PipelineInstrument(p),
max_n=10,
max_duration_secs=60)
# The background caching job and the recording isn't done yet so there may
# be more elements to be recorded.
self.assertFalse(recording.is_computed())
self.assertFalse(recording.computed())
self.assertTrue(recording.uncomputed())
# The recording is finished but the background caching job is not. There
# may still be more elements to record, or the intermediate PCollection may
# have stopped caching in an incomplete state, e.g. before a window could
# fire.
mock_result.set_state(PipelineState.DONE)
recording.wait_until_finish()
self.assertFalse(recording.is_computed())
self.assertFalse(recording.computed())
self.assertTrue(recording.uncomputed())
# The background caching job finished before we started a recording which
# is a sure signal that there will be no more elements.
bcj_mock_result.set_state(PipelineState.DONE)
ie.current_env().set_background_caching_job(p, background_caching_job)
recording = Recording(
p, [elems],
mock_result,
pi.PipelineInstrument(p),
max_n=10,
max_duration_secs=60)
recording.wait_until_finish()
# There are no more elements and the recording finished, meaning that the
# intermediate PCollections are in a complete state. They can now be marked
# as computed.
self.assertTrue(recording.is_computed())
self.assertTrue(recording.computed())
self.assertFalse(recording.uncomputed())
def test_describe(self):
p = beam.Pipeline(InteractiveRunner())
numbers = p | 'numbers' >> beam.Create([0, 1, 2])
letters = p | 'letters' >> beam.Create(['a', 'b', 'c'])
ib.watch(locals())
# Create a MockPipelineResult to control the state of a fake run of the
# pipeline.
mock_result = MockPipelineResult()
ie.current_env().track_user_pipelines()
ie.current_env().set_pipeline_result(p, mock_result)
cache_manager = InMemoryCache()
ie.current_env().set_cache_manager(cache_manager, p)
# Create a recording with an arbitrary start time.
recording = Recording(
p, [numbers, letters],
mock_result,
pi.PipelineInstrument(p),
max_n=10,
max_duration_secs=60)
# Get the cache key of the stream and write something to cache. This is
# so that a pipeline doesn't have to run in the test.
numbers_stream = recording.stream(numbers)
cache_manager.write([0, 1, 2], 'full', numbers_stream.cache_key)
cache_manager.save_pcoder(None, 'full', numbers_stream.cache_key)
letters_stream = recording.stream(letters)
cache_manager.write(['a', 'b', 'c'], 'full', letters_stream.cache_key)
cache_manager.save_pcoder(None, 'full', letters_stream.cache_key)
# Get the description.
description = recording.describe()
size = description['size']
self.assertEqual(
size,
cache_manager.size('full', numbers_stream.cache_key) +
cache_manager.size('full', letters_stream.cache_key))
class RecordingManagerTest(unittest.TestCase):
def test_basic_execution(self):
"""A basic pipeline to be used as a smoke test."""
# Create the pipeline that will emit 0, 1, 2.
p = beam.Pipeline(InteractiveRunner())
numbers = p | 'numbers' >> beam.Create([0, 1, 2])
letters = p | 'letters' >> beam.Create(['a', 'b', 'c'])
# Watch the pipeline and PCollections. This is normally done in a notebook
# environment automatically, but we have to do it manually here.
ib.watch(locals())
ie.current_env().track_user_pipelines()
# Create the recording objects. By calling `record` a new PipelineFragment
# is started to compute the given PCollections and cache to disk.
rm = RecordingManager(p)
numbers_recording = rm.record([numbers], max_n=3, max_duration=500)
numbers_stream = numbers_recording.stream(numbers)
numbers_recording.wait_until_finish()
# Once the pipeline fragment completes, we can read from the stream and know
# that all elements were written to cache.
elems = list(numbers_stream.read())
expected_elems = [
WindowedValue(i, MIN_TIMESTAMP, [GlobalWindow()]) for i in range(3)
]
self.assertListEqual(elems, expected_elems)
# Make an extra recording and test the description.
letters_recording = rm.record([letters], max_n=3, max_duration=500)
letters_recording.wait_until_finish()
self.assertEqual(
rm.describe()['size'],
numbers_recording.describe()['size'] +
letters_recording.describe()['size'])
rm.cancel()
def test_duration_parsing(self):
p = beam.Pipeline(InteractiveRunner())
elems = p | beam.Create([0, 1, 2])
# Watch the pipeline and PCollections. This is normally done in a notebook
# environment automatically, but we have to do it manually here.
ib.watch(locals())
ie.current_env().track_user_pipelines()
# Create the recording objects.
rm = RecordingManager(p)
recording = rm.record([elems], max_n=3, max_duration='500s')
recording.wait_until_finish()
# Assert that the duration was parsed correctly to integer seconds.
self.assertEqual(recording.describe()['duration'], 500)
def test_cancel_stops_recording(self):
# Add the TestStream so that it can be cached.
ib.options.recordable_sources.add(TestStream)
p = beam.Pipeline(
InteractiveRunner(), options=PipelineOptions(streaming=True))
elems = (
p
| TestStream().advance_watermark_to(0).advance_processing_time(
1).add_elements(list(range(10))).advance_processing_time(1))
squares = elems | beam.Map(lambda x: x**2)
# Watch the local scope for Interactive Beam so that referenced PCollections
# will be cached.
ib.watch(locals())
# This is normally done in the interactive_utils when a transform is
# applied but needs an IPython environment. So we manually run this here.
ie.current_env().track_user_pipelines()
class SemaphoreLimiter(Limiter):
def __init__(self):
self.triggered = False
def is_triggered(self):
return self.triggered
# Get the recording then the BackgroundCachingJob.
semaphore_limiter = SemaphoreLimiter()
rm = RecordingManager(p, test_limiters=[semaphore_limiter])
rm.record([squares], max_n=10, max_duration=500)
# The BackgroundCachingJob is still waiting for more elements, so it isn't
# done yet.
bcj = ie.current_env().get_background_caching_job(p)
self.assertFalse(bcj.is_done())
# Assert that something was read and that the BackgroundCachingJob was
# sucessfully stopped.
# self.assertTrue(list(recording.stream(squares).read()))
semaphore_limiter.triggered = True
rm.cancel()
self.assertTrue(bcj.is_done())
def test_recording_manager_clears_cache(self):
"""Tests that the RecordingManager clears the cache before recording.
A job may have incomplete PCollections when the job terminates. Clearing the
cache ensures that correct results are computed every run.
"""
# Add the TestStream so that it can be cached.
ib.options.recordable_sources.add(TestStream)
p = beam.Pipeline(
InteractiveRunner(), options=PipelineOptions(streaming=True))
elems = (
p
| TestStream().advance_watermark_to(0).advance_processing_time(
1).add_elements(list(range(10))).advance_processing_time(1))
squares = elems | beam.Map(lambda x: x**2)
# Watch the local scope for Interactive Beam so that referenced PCollections
# will be cached.
ib.watch(locals())
# This is normally done in the interactive_utils when a transform is
# applied but needs an IPython environment. So we manually run this here.
ie.current_env().track_user_pipelines()
# Do the first recording to get the timestamp of the first time the fragment
# was run.
rm = RecordingManager(p)
# Get the cache, key, and coder to read the PCollection from the cache.
pipeline_instrument = pi.PipelineInstrument(p)
# Set up a mock for the Cache's clear function which will be used to clear
# uncomputed PCollections.
rm._clear_pcolls = MagicMock()
rm.record([squares], max_n=1, max_duration=500)
rm.cancel()
# Assert that the cache cleared the PCollection.
rm._clear_pcolls.assert_any_call(
unittest.mock.ANY,
set(pipeline_instrument.cache_key(pc) for pc in (elems, squares)))
def test_clear(self):
"""Tests that clear can empty the cache for a specific pipeline."""
# Create two pipelines so we can check that clearing the cache won't clear
# all defined pipelines.
p1 = beam.Pipeline(InteractiveRunner())
elems_1 = p1 | 'elems 1' >> beam.Create([0, 1, 2])
p2 = beam.Pipeline(InteractiveRunner())
elems_2 = p2 | 'elems 2' >> beam.Create([0, 1, 2])
# Watch the pipeline and PCollections. This is normally done in a notebook
# environment automatically, but we have to do it manually here.
ib.watch(locals())
ie.current_env().track_user_pipelines()
# Create the recording objects. By calling `record` a new PipelineFragment
# is started to compute the given PCollections and cache to disk.
rm_1 = RecordingManager(p1)
recording = rm_1.record([elems_1], max_n=3, max_duration=500)
recording.wait_until_finish()
rm_2 = RecordingManager(p2)
recording = rm_2.record([elems_2], max_n=3, max_duration=500)
recording.wait_until_finish()
# Assert that clearing only one recording clears that recording.
self.assertGreater(rm_1.describe()['size'], 0)
self.assertGreater(rm_2.describe()['size'], 0)
rm_1.clear()
self.assertEqual(rm_1.describe()['size'], 0)
self.assertGreater(rm_2.describe()['size'], 0)
rm_2.clear()
self.assertEqual(rm_2.describe()['size'], 0)
def test_record_pipeline(self):
# Add the TestStream so that it can be cached.
ib.options.recordable_sources.add(TestStream)
p = beam.Pipeline(
InteractiveRunner(), options=PipelineOptions(streaming=True))
# pylint: disable=unused-variable
_ = (p
| TestStream()
.advance_watermark_to(0)
.advance_processing_time(1)
.add_elements(list(range(10)))
.advance_processing_time(1)) # yapf: disable
# Watch the local scope for Interactive Beam so that referenced PCollections
# will be cached.
ib.watch(locals())
# This is normally done in the interactive_utils when a transform is
# applied but needs an IPython environment. So we manually run this here.
ie.current_env().track_user_pipelines()
# Create a lmiter that stops the background caching job when something is
# written to cache. This is used to make ensure that the pipeline is
# functioning properly and that there are no data races with the test.
class SizeLimiter(Limiter):
def __init__(self, p):
self.pipeline = p
self._rm = None
def set_recording_manager(self, rm):
self._rm = rm
def is_triggered(self):
return self._rm.describe()['size'] > 0 if self._rm else False
# Do the first recording to get the timestamp of the first time the fragment
# was run.
size_limiter = SizeLimiter(p)
rm = RecordingManager(p, test_limiters=[size_limiter])
size_limiter.set_recording_manager(rm)
self.assertEqual(rm.describe()['state'], PipelineState.STOPPED)
self.assertTrue(rm.record_pipeline())
# A recording is in progress, no need to start another one.
self.assertFalse(rm.record_pipeline())
for _ in range(60):
if rm.describe()['state'] == PipelineState.CANCELLED:
break
time.sleep(1)
self.assertTrue(
rm.describe()['state'] == PipelineState.CANCELLED,
'Test timed out waiting for pipeline to be cancelled. This indicates '
'that the BackgroundCachingJob did not cache anything.')
if __name__ == '__main__':
unittest.main()