blob: 616d250364bf61d68aeccecf4f9df9bc86514d13 [file] [log] [blame]
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
from __future__ import absolute_import
import json
import logging
import sys
import unittest
import numpy as np
import pandas as pd
from apache_beam.runners.interactive import interactive_environment as ie
from apache_beam.runners.interactive import utils
from apache_beam.utils.windowed_value import WindowedValue
# TODO(BEAM-8288): clean up the work-around of nose tests using Python2 without
# unittest.mock module.
try:
from unittest.mock import patch
except ImportError:
from mock import patch
class ParseToDataframeTest(unittest.TestCase):
def test_parse_windowedvalue(self):
"""Tests that WindowedValues are supported but not present.
"""
from apache_beam.transforms.window import GlobalWindow
els = [
WindowedValue(('a', 2), 1, [GlobalWindow()]),
WindowedValue(('b', 3), 1, [GlobalWindow()])
]
actual_df = utils.elements_to_df(els, include_window_info=False)
expected_df = pd.DataFrame([['a', 2], ['b', 3]], columns=[0, 1])
# check_like so that ordering of indices doesn't matter.
pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
def test_parse_windowedvalue_with_window_info(self):
"""Tests that WindowedValues are supported and have their own columns.
"""
from apache_beam.transforms.window import GlobalWindow
els = [
WindowedValue(('a', 2), 1, [GlobalWindow()]),
WindowedValue(('b', 3), 1, [GlobalWindow()])
]
actual_df = utils.elements_to_df(els, include_window_info=True)
expected_df = pd.DataFrame(
[['a', 2, int(1e6), els[0].windows, els[0].pane_info],
['b', 3, int(1e6), els[1].windows, els[1].pane_info]],
columns=[0, 1, 'event_time', 'windows', 'pane_info'])
# check_like so that ordering of indices doesn't matter.
pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
def test_parse_windowedvalue_with_dicts(self):
"""Tests that dicts play well with WindowedValues.
"""
from apache_beam.transforms.window import GlobalWindow
els = [
WindowedValue({
'b': 2, 'd': 4
}, 1, [GlobalWindow()]),
WindowedValue({
'a': 1, 'b': 2, 'c': 3
}, 1, [GlobalWindow()])
]
actual_df = utils.elements_to_df(els, include_window_info=True)
expected_df = pd.DataFrame(
[[np.nan, 2, np.nan, 4, int(1e6), els[0].windows, els[0].pane_info],
[1, 2, 3, np.nan, int(1e6), els[1].windows, els[1].pane_info]],
columns=['a', 'b', 'c', 'd', 'event_time', 'windows', 'pane_info'])
# check_like so that ordering of indices doesn't matter.
pd.testing.assert_frame_equal(actual_df, expected_df, check_like=True)
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
@unittest.skipIf(
sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
class IPythonLogHandlerTest(unittest.TestCase):
def setUp(self):
utils.register_ipython_log_handler()
self._interactive_root_logger = logging.getLogger(
'apache_beam.runners.interactive')
def test_ipython_log_handler_not_double_registered(self):
utils.register_ipython_log_handler()
ipython_log_handlers = list(
filter(
lambda x: isinstance(x, utils.IPythonLogHandler),
[handler for handler in self._interactive_root_logger.handlers]))
self.assertEqual(1, len(ipython_log_handlers))
@patch('apache_beam.runners.interactive.utils.IPythonLogHandler.emit')
def test_default_logging_level_is_info(self, mock_emit):
# By default the logging level of loggers and log handlers are NOTSET. Also,
# the propagation is default to true for all loggers. In this scenario, all
# loggings from child loggers will be propagated to the interactive "root"
# logger which is set to INFO level that gets handled by the sole log
# handler IPythonLogHandler which is set to NOTSET. The effect will be
# everything >= info level will be logged through IPython.core.display to
# all frontends connected to current kernel.
dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy1')
dummy_logger.info('info')
mock_emit.assert_called_once()
dummy_logger.debug('debug')
# Emit is not called, so it's still called once.
mock_emit.assert_called_once()
@patch('apache_beam.runners.interactive.utils.IPythonLogHandler.emit')
def test_child_module_logger_can_override_logging_level(self, mock_emit):
# When a child logger's logging level is configured to something that is not
# NOTSET, it takes back the logging control from the interactive "root"
# logger by not propagating anything.
dummy_logger = logging.getLogger('apache_beam.runners.interactive.dummy2')
dummy_logger.setLevel(logging.DEBUG)
mock_emit.assert_not_called()
dummy_logger.debug('debug')
# Because the dummy child logger is configured to log at DEBUG level, it
# now propagates DEBUG loggings to the interactive "root" logger.
mock_emit.assert_called_once()
# When the dummy child logger is configured to log at CRITICAL level, it
# will only propagate CRITICAL loggings to the interactive "root" logger.
dummy_logger.setLevel(logging.CRITICAL)
# Error loggings will not be handled now.
dummy_logger.error('error')
# Emit is not called, so it's still called once.
mock_emit.assert_called_once()
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
@unittest.skipIf(
sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
class ProgressIndicatorTest(unittest.TestCase):
def setUp(self):
ie.new_env()
@patch('IPython.core.display.display')
def test_progress_in_plain_text_when_not_in_notebook(self, mocked_display):
ie.current_env()._is_in_notebook = False
mocked_display.assert_not_called()
@utils.progress_indicated
def progress_indicated_dummy():
mocked_display.assert_called_with('Processing...')
progress_indicated_dummy()
mocked_display.assert_called_with('Done.')
@patch('IPython.core.display.HTML')
@patch('IPython.core.display.Javascript')
@patch('IPython.core.display.display')
@patch('IPython.core.display.display_javascript')
def test_progress_in_HTML_JS_when_in_notebook(
self,
mocked_display_javascript,
mocked_display,
mocked_javascript,
mocked_html):
ie.current_env()._is_in_notebook = True
mocked_display.assert_not_called()
mocked_display_javascript.assert_not_called()
@utils.progress_indicated
def progress_indicated_dummy():
mocked_display.assert_called_once()
mocked_html.assert_called_once()
progress_indicated_dummy()
mocked_display_javascript.assert_called_once()
mocked_javascript.assert_called_once()
@unittest.skipIf(
not ie.current_env().is_interactive_ready,
'[interactive] dependency is not installed.')
@unittest.skipIf(
sys.version_info < (3, 6), 'The tests require at least Python 3.6 to work.')
class MessagingUtilTest(unittest.TestCase):
SAMPLE_DATA = {'a': [1, 2, 3], 'b': 4, 'c': '5', 'd': {'e': 'f'}}
def setUp(self):
ie.new_env()
def test_as_json_decorator(self):
@utils.as_json
def dummy():
return MessagingUtilTest.SAMPLE_DATA
# As of Python 3.6, for the CPython implementation of Python,
# dictionaries remember the order of items inserted.
self.assertEqual(json.loads(dummy()), MessagingUtilTest.SAMPLE_DATA)
if __name__ == '__main__':
unittest.main()