blob: 5fb41df35862a792b21e2fa0f5b2052361918c23 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import importlib
import json
import logging
import tempfile
import unittest
from typing import NamedTuple
from unittest.mock import PropertyMock
from unittest.mock import patch
import numpy as np
import pandas as pd
import pytest
import apache_beam as beam
from apache_beam import coders
from apache_beam.dataframe.convert import to_dataframe
from apache_beam.portability.api import beam_runner_api_pb2
from apache_beam.runners.interactive import interactive_beam as ib
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import utils
from apache_beam.runners.interactive.caching.cacheable import Cacheable
from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
from apache_beam.runners.interactive.testing.mock_ipython import mock_get_ipython
from apache_beam.runners.interactive.testing.test_cache_manager import InMemoryCache
from apache_beam.runners.portability.flink_runner import FlinkRunner
from apache_beam.testing.test_stream import WindowedValueHolder
from apache_beam.utils.timestamp import Timestamp
from apache_beam.utils.windowed_value import WindowedValue
# Protect against environments where apitools library is not available.
try:
from google.cloud.exceptions import BadRequest
from google.cloud.exceptions import NotFound
except ImportError:
_http_error_imported = False
else:
_http_error_imported = True
class MockStorageClient():
def __init__(self):
pass
def get_bucket(self, path):
if path == 'test-bucket-not-found':
raise NotFound('Bucket not found')
elif path == 'test-bucket-not-verified':
raise BadRequest('Request faulty')
class Record(NamedTuple):
order_id: int
product_id: int
quantity: int
def windowed_value(e):
from apache_beam.transforms.window import GlobalWindow
return WindowedValue(e, 1, [GlobalWindow()])
class ParseToDataframeTest(unittest.TestCase):
def test_parse_windowedvalue(self):
"""Tests that WindowedValues are supported but not present.
"""
els = [windowed_value(('a', 2)), windowed_value(('b', 3))]
actual_df = utils.elements_to_df(els, include_window_info=False)
expected_df = pd.DataFrame([['a', 2], ['b', 3]], columns=[0, 1])
# check_like so that ordering of indices doesn't matter.
pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
def test_parse_windowedvalue_with_window_info(self):
"""Tests that WindowedValues are supported and have their own columns.
"""
els = [windowed_value(('a', 2)), windowed_value(('b', 3))]
actual_df = utils.elements_to_df(els, include_window_info=True)
expected_df = pd.DataFrame(
[['a', 2, int(1e6), els[0].windows, els[0].pane_info],
['b', 3, int(1e6), els[1].windows, els[1].pane_info]],
columns=[0, 1, 'event_time', 'windows', 'pane_info'])
# check_like so that ordering of indices doesn't matter.
pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
def test_parse_windowedvalue_with_dicts(self):
"""Tests that dicts play well with WindowedValues.
"""
els = [
windowed_value({
'b': 2, 'd': 4
}),
windowed_value({
'a': 1, 'b': 2, 'c': 3
})
]
actual_df = utils.elements_to_df(els, include_window_info=True)
expected_df = pd.DataFrame(
[[np.nan, 2, np.nan, 4, int(1e6), els[0].windows, els[0].pane_info],
[1, 2, 3, np.nan, int(1e6), els[1].windows, els[1].pane_info]],
columns=['a', 'b', 'c', 'd', 'event_time', 'windows', 'pane_info'])
# check_like so that ordering of indices doesn't matter.
pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
def test_parse_dataframes(self):
"""Tests that it correctly parses a DataFrame.
"""
deferred = to_dataframe(beam.Pipeline() | beam.Create([Record(0, 0, 0)]))
els = [windowed_value(pd.DataFrame(Record(n, 0, 0))) for n in range(10)]
actual_df = utils.elements_to_df(
els, element_type=deferred._expr.proxy()).reset_index(drop=True)
expected_df = pd.concat([e.value for e in els], ignore_index=True)
pd.testing.assert_frame_equal(actual_df, expected_df)
def test_parse_series(self):
"""Tests that it correctly parses a Pandas Series.
"""
deferred = to_dataframe(beam.Pipeline()
| beam.Create([Record(0, 0, 0)]))['order_id']
els = [windowed_value(pd.Series([n])) for n in range(10)]
actual_df = utils.elements_to_df(
els, element_type=deferred._expr.proxy()).reset_index(drop=True)
expected_df = pd.concat([e.value for e in els], ignore_index=True)
pd.testing.assert_series_equal(actual_df, expected_df)
class ToElementListTest(unittest.TestCase):
def test_test_stream_payload_events(self):
"""Tests that the to_element_list can limit the count in a single bundle."""
coder = coders.FastPrimitivesCoder()
def reader():
element_payload = [
beam_runner_api_pb2.TestStreamPayload.TimestampedElement(
encoded_element=coder.encode(
WindowedValueHolder(WindowedValue(e, 0, []))),
timestamp=Timestamp.of(0).micros) for e in range(10)
]
event = beam_runner_api_pb2.TestStreamPayload.Event(
element_event=beam_runner_api_pb2.TestStreamPayload.Event.AddElements(
elements=element_payload))
yield event
# The reader creates 10 elements in a single TestStreamPayload but we limit
# the number of elements read to 5 here. This tests that the to_element_list
# can limit the number of elements in a single bundle.
elements = utils.to_element_list(
reader(), coder, include_window_info=False, n=5)
self.assertSequenceEqual(list(elements), list(range(5)))
def test_element_limit_count(self):
"""Tests that the to_element_list can limit the count."""
elements = utils.to_element_list(
iter(range(10)), None, include_window_info=False, n=5)
self.assertSequenceEqual(list(elements), list(range(5)))
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
class IPythonLogHandlerTest(unittest.TestCase):
def setUp(self):
utils.register_ipython_log_handler()
self._interactive_root_logger = logging.getLogger(
'apache_beam.runners.interactive')
def test_ipython_log_handler_not_double_registered(self):
utils.register_ipython_log_handler()
ipython_log_handlers = list(
filter(
lambda x: isinstance(x, utils.IPythonLogHandler),
[handler for handler in self._interactive_root_logger.handlers]))
self.assertEqual(1, len(ipython_log_handlers))
@patch('apache_beam.runners.interactive.utils.IPythonLogHandler.emit')
def test_default_logging_level_is_info(self, mock_emit):
# By default the logging level of loggers and log handlers are NOTSET. Also,
# the propagation is default to true for all loggers. In this scenario, all
# loggings from child loggers will be propagated to the interactive "root"
# logger which is set to INFO level that gets handled by the sole log
# handler IPythonLogHandler which is set to NOTSET. The effect will be
# everything >= info level will be logged through IPython.display to
# all frontends connected to current kernel.
dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy1')
dummy_logger.info('info')
mock_emit.assert_called_once()
dummy_logger.debug('debug')
# Emit is not called, so it's still called once.
mock_emit.assert_called_once()
@patch('apache_beam.runners.interactive.utils.IPythonLogHandler.emit')
def test_child_module_logger_can_override_logging_level(self, mock_emit):
# When a child logger's logging level is configured to something that is not
# NOTSET, it takes back the logging control from the interactive "root"
# logger by not propagating anything.
dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy2')
dummy_logger.setLevel(logging.DEBUG)
mock_emit.assert_not_called()
dummy_logger.debug('debug')
# Because the dummy child logger is configured to log at DEBUG level, it
# now propagates DEBUG loggings to the interactive "root" logger.
mock_emit.assert_called_once()
# When the dummy child logger is configured to log at CRITICAL level, it
# will only propagate CRITICAL loggings to the interactive "root" logger.
dummy_logger.setLevel(logging.CRITICAL)
# Error loggings will not be handled now.
dummy_logger.error('error')
# Emit is not called, so it's still called once.
mock_emit.assert_called_once()
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
@pytest.mark.skipif(
not ie.current_env().is_interactive_ready,
reason='[interactive] dependency is not installed.')
class ProgressIndicatorTest(unittest.TestCase):
def setUp(self):
ie.new_env()
@patch('IPython.get_ipython', new_callable=mock_get_ipython)
@patch(
'apache_beam.runners.interactive.interactive_environment'
'.InteractiveEnvironment.is_in_notebook',
new_callable=PropertyMock)
def test_progress_in_plain_text_when_not_in_notebook(
self, mocked_is_in_notebook, unused):
mocked_is_in_notebook.return_value = False
with patch('IPython.display.display') as mocked_display:
@utils.progress_indicated
def progress_indicated_dummy():
mocked_display.assert_any_call('Processing... progress_indicated_dummy')
progress_indicated_dummy()
mocked_display.assert_any_call('Done.')
@patch('IPython.get_ipython', new_callable=mock_get_ipython)
@patch(
'apache_beam.runners.interactive.interactive_environment'
'.InteractiveEnvironment.is_in_notebook',
new_callable=PropertyMock)
def test_progress_in_HTML_JS_when_in_notebook(
self, mocked_is_in_notebook, unused):
mocked_is_in_notebook.return_value = True
with patch('IPython.display.HTML') as mocked_html,\
patch('IPython.display.Javascript') as mocked_js:
with utils.ProgressIndicator('enter', 'exit'):
mocked_html.assert_called()
mocked_js.assert_called()
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
class MessagingUtilTest(unittest.TestCase):
SAMPLE_DATA = {'a': [1, 2, 3], 'b': 4, 'c': '5', 'd': {'e': 'f'}}
def setUp(self):
ie.new_env()
def test_as_json_decorator(self):
@utils.as_json
def dummy():
return MessagingUtilTest.SAMPLE_DATA
# As of Python 3.6, for the CPython implementation of Python,
# dictionaries remember the order of items inserted.
self.assertEqual(json.loads(dummy()), MessagingUtilTest.SAMPLE_DATA)
class GeneralUtilTest(unittest.TestCase):
def test_pcoll_by_name(self):
p = beam.Pipeline()
pcoll = p | beam.Create([1])
ib.watch({'p': p, 'pcoll': pcoll})
name_to_pcoll = utils.pcoll_by_name()
self.assertIn('pcoll', name_to_pcoll)
def test_cacheables(self):
p2 = beam.Pipeline()
pcoll2 = p2 | beam.Create([2])
ib.watch({'p2': p2, 'pcoll2': pcoll2})
cacheables = utils.cacheables()
cacheable_key = Cacheable.from_pcoll('pcoll2', pcoll2).to_key()
self.assertIn(cacheable_key, cacheables)
def test_has_unbounded_source(self):
p = beam.Pipeline()
ie.current_env().set_cache_manager(InMemoryCache(), p)
_ = p | 'ReadUnboundedSource' >> beam.io.ReadFromPubSub(
subscription='projects/fake-project/subscriptions/fake_sub')
self.assertTrue(utils.has_unbounded_sources(p))
def test_not_has_unbounded_source(self):
p = beam.Pipeline()
ie.current_env().set_cache_manager(InMemoryCache(), p)
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(b'test')
_ = p | 'ReadBoundedSource' >> beam.io.ReadFromText(f.name)
self.assertFalse(utils.has_unbounded_sources(p))
def test_find_pcoll_name(self):
p = beam.Pipeline()
pcoll = p | beam.Create([1, 2, 3])
ib.watch({
'p_test_find_pcoll_name': p,
'pcoll_test_find_pcoll_name': pcoll,
})
self.assertEqual('pcoll_test_find_pcoll_name', utils.find_pcoll_name(pcoll))
def test_create_var_in_main(self):
name = 'test_create_var_in_main'
value = Record(0, 0, 0)
_ = utils.create_var_in_main(name, value)
main_session = importlib.import_module('__main__')
self.assertIs(getattr(main_session, name, None), value)
@patch('google.cloud.storage.Client', return_value=MockStorageClient())
@unittest.skipIf(not _http_error_imported, 'http errors are not imported.')
class GCSUtilsTest(unittest.TestCase):
@patch('google.cloud.storage.Client.get_bucket')
def test_assert_bucket_exists_not_found(self, mock_response, mock_client):
with self.assertRaises(ValueError):
utils.assert_bucket_exists('test-bucket-not-found')
@patch('google.cloud.storage.Client.get_bucket')
def test_assert_bucket_exists_not_verified(self, mock_response, mock_client):
from apache_beam.runners.interactive.utils import _LOGGER
with self.assertLogs(_LOGGER, level='WARNING'):
utils.assert_bucket_exists('test-bucket-not-verified')
@patch('google.cloud.storage.Client.get_bucket')
def test_assert_bucket_exists_found(self, mock_response, mock_client):
utils.assert_bucket_exists('')
class PipelineUtilTest(unittest.TestCase):
def test_detect_pipeline_underlying_runner(self):
p = beam.Pipeline(InteractiveRunner(underlying_runner=FlinkRunner()))
pipeline_runner = utils.detect_pipeline_runner(p)
self.assertTrue(isinstance(pipeline_runner, FlinkRunner))
def test_detect_pipeline_no_underlying_runner(self):
p = beam.Pipeline(InteractiveRunner())
pipeline_runner = utils.detect_pipeline_runner(p)
from apache_beam.runners.direct.direct_runner import DirectRunner
self.assertTrue(isinstance(pipeline_runner, DirectRunner))
def test_detect_pipeline_no_runner(self):
pipeline_runner = utils.detect_pipeline_runner(None)
self.assertEqual(pipeline_runner, None)
if __name__ == '__main__':
unittest.main()