blob: c68f6b1dc3ac968898f4937b1054063dc3f4b5d3 [file] [log] [blame]
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import sys
from os import path
# Add modules to the pythonpath.
sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))))
#modules/utilities to the pythonpath.
sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))))
import unittest
from mock import *
import plpy_mock as plpy
m4_changequote(`<!', `!>')
class AnyStringWith(str):
def __eq__(self, other):
return self in other
class MiniBatchPreProcessingTestCase(unittest.TestCase):
def setUp(self):
self.plpy_mock = Mock(spec='error')
patches = {
'plpy': plpy,
'utilities.mean_std_dev_calculator': Mock(),
}
# we need to use MagicMock() instead of Mock() for the plpy.execute mock
# to be able to iterate on the return value
self.plpy_mock_execute = MagicMock()
plpy.execute = self.plpy_mock_execute
self.module_patcher = patch.dict('sys.modules', patches)
self.module_patcher.start()
self.default_schema_madlib = "madlib"
self.default_source_table = "source"
self.default_output_table = "output"
self.default_dep_var = "depvar"
self.default_ind_var = "indvar"
self.grouping_cols = None
self.default_buffer_size = 5
import utilities.minibatch_preprocessing
self.module = utilities.minibatch_preprocessing
self.module.validate_module_input_params = Mock()
self.output_tbl_valid_mock = Mock()
self.module.output_tbl_valid = self.output_tbl_valid_mock
self.module.MiniBatchStandardizer = Mock()
self.module.get_expr_type = MagicMock(return_value="character")
self.module.get_distinct_col_levels = Mock(return_value = [0,22,100])
self.subject = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size)
def tearDown(self):
self.module_patcher.stop()
def test_minibatch_preprocessor_executes_query(self):
preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
"input",
"out",
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size)
preprocessor_obj.minibatch_preprocessor()
def test_minibatch_preprocessor_null_buffer_size_executes_query(self):
preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
"input",
"out",
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
None)
self.module.MiniBatchBufferSizeCalculator.calculate_default_buffer_size = Mock()
preprocessor_obj.minibatch_preprocessor()
def test_minibatch_preprocessor_multiple_dep_var_raises_exception(self):
with self.assertRaises(plpy.PLPYException):
self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
"y1,y2",
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size)
def test_minibatch_preprocessor_buffer_size_zero_fails(self):
with self.assertRaises(plpy.PLPYException):
self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
0)
def test_minibatch_preprocessor_buffer_size_one_passes(self):
#not sure how to assert that an exception has not been raised
preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
1)
preprocessor_obj.minibatch_preprocessor()
def test_get_dep_var_array_expr_array_type(self):
self.module.get_expr_type = MagicMock(return_value="integer[]")
obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size)
dep_var_array_expr = obj.get_dep_var_array_expr()
self.assertEqual(self.default_dep_var, dep_var_array_expr)
def test_get_dep_var_array_expr_numeric_type(self):
self.module.get_expr_type = MagicMock(return_value="integer")
self.module.get_distinct_col_levels = Mock(return_value = [0,22,100])
obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size,
one_hot_encode_int_dep_var=True)
dep_var_array_expr = obj.get_dep_var_array_expr()
self.assertEqual("array[({0}) = 0, ({0}) = 22, ({0}) = 100]::integer[]".
format(self.default_dep_var),
dep_var_array_expr.lower())
self.module.get_expr_type = MagicMock(return_value="numeric")
obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size,
one_hot_encode_int_dep_var=False)
dep_var_array_expr = obj.get_dep_var_array_expr()
self.assertEqual("array[({0})]".format(self.default_dep_var),
dep_var_array_expr.lower())
def test_get_dep_var_array_expr_text_type(self):
self.module.get_expr_type = MagicMock(return_value="character")
self.module.get_distinct_col_levels = Mock(return_value = ["'a'","'b'","'c'"])
obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size)
dep_var_array_expr = obj.get_dep_var_array_expr()
self.assertEqual("array[({0}) = 'a', ({0}) = 'b', ({0}) = 'c']::integer[]".
format(self.default_dep_var),
dep_var_array_expr.lower())
def test_get_dep_var_array_expr_bool_type(self): #TODO check for boolean types
self.module.get_expr_type = MagicMock(return_value="boolean")
self.module.get_distinct_col_levels = Mock(return_value = ['True', False])
obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size)
dep_var_array_expr = obj.get_dep_var_array_expr()
self.assertEqual("array[({0}) = True, ({0}) = False]::integer[]".
format(self.default_dep_var).lower(),
dep_var_array_expr.lower())
def test_get_dep_var_array_expr_other_type(self):
self.module.get_expr_type = MagicMock(return_value="other")
with self.assertRaises(plpy.PLPYException):
self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size)
self.module.get_expr_type = MagicMock(return_value="other[]")
with self.assertRaises(plpy.PLPYException):
self.module.MiniBatchPreProcessor(self.default_schema_madlib,
self.default_source_table,
self.default_output_table,
self.default_dep_var,
self.default_ind_var,
self.grouping_cols,
self.default_buffer_size)
def test_get_indep_var_array_expr_passes(self):
ind_var_array_str = self.subject.get_indep_var_array_expr()
self.assertEqual("({0})::double precision[]".format(self.default_ind_var), ind_var_array_str.lower())
class MiniBatchQueryStandardizerTestCase(unittest.TestCase):
def setUp(self):
self.default_source_table = "source"
self.default_dep_var = "depvar"
self.default_ind_var = "indvar"
self.default_schema = "schema"
self.mean_std_calculator_mock = Mock()
patches = {
'plpy': plpy,
'utilities.mean_std_dev_calculator': self.mean_std_calculator_mock
}
self.x_mean = "5678"
self.x_std_dev = "4.789"
self.mean_std_calculator_mock.MeanStdDevCalculator.return_value.\
get_mean_and_std_dev_for_ind_var = \
Mock(return_value=(self.x_mean, self.x_std_dev))
# we need to use MagicMock() instead of Mock() for the plpy.execute mock
# to be able to iterate on the return value
self.plpy_mock_execute = MagicMock()
plpy.execute = self.plpy_mock_execute
self.module_patcher = patch.dict('sys.modules', patches)
self.module_patcher.start()
import utilities.minibatch_preprocessing
self.module = utilities.minibatch_preprocessing
self.subject = self.module.MiniBatchStandardizer(self.default_schema,
self.default_source_table,
self.default_dep_var,
self.default_ind_var,
None,
"out_standardization")
def tearDown(self):
self.module_patcher.stop()
def test_get_query_for_standardizing_no_grouping(self):
self.subject.get_query_for_standardizing()
self.assertEqual(self.x_mean, self.subject.x_mean_str)
self.assertEqual(self.x_std_dev, self.subject.x_std_dev_str)
def test_get_query_for_standardizing_grouping(self):
self.subject = self.module.MiniBatchStandardizer(self.default_schema,
self.default_source_table,
self.default_dep_var,
self.default_ind_var,
"grp",
"out_standardization")
self.subject.get_query_for_standardizing()
def test_create_standardization_output_table_executes_query(self):
self.subject.create_output_standardization_table()
expected_query_substr_create_table = "CREATE TABLE out_standardization AS"
self.plpy_mock_execute.assert_called_with(AnyStringWith(
expected_query_substr_create_table))
self.plpy_mock_execute.assert_called_with(AnyStringWith(self.x_mean))
self.plpy_mock_execute.assert_called_with(AnyStringWith(self.x_std_dev))
class MiniBatchBufferSizeCalculatorTestCase(unittest.TestCase):
def setUp(self):
patches = {
'plpy': plpy,
'utilities.mean_std_dev_calculator': Mock()
}
self.a = 'a'
self.module_patcher = patch.dict('sys.modules', patches)
self.module_patcher.start()
import utilities.minibatch_preprocessing
self.module = utilities.minibatch_preprocessing
self.subject = self.module.MiniBatchBufferSizeCalculator
def tearDown(self):
self.module_patcher.stop()
def test_calculate_default_buffer_size_non_none_buffer_size(self):
buffer_size = self.subject.calculate_default_buffer_size(1, 3, 100)
self.assertTrue(isinstance(buffer_size, int))
self.assertEqual(1, buffer_size)
def test_calculate_default_buffer_size_none_buffer_size(self):
self.module.get_seg_number = Mock(return_value = 4)
buffer_size = self.subject.calculate_default_buffer_size(None, 100, 1000)
self.assertTrue(isinstance(buffer_size, int))
self.assertEqual(25, buffer_size)
def test_calculate_default_buffer_size_none_buffer_size_rounds_to_int(self):
self.module.get_seg_number = Mock(return_value = 5)
buffer_size = self.subject.calculate_default_buffer_size(None, 3, 1000)
self.assertTrue(isinstance(buffer_size, int))
self.assertEqual(1, buffer_size)
#TODO add more tests after finalizing the buffer size calculation
if __name__ == '__main__':
unittest.main()
# ---------------------------------------------------------------------