| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import sys |
| from os import path |
| |
| # Add modules to the pythonpath. |
| sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__))))))) |
| #modules/utilities to the pythonpath. |
| sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))) |
| |
| import unittest |
| from mock import * |
| import plpy_mock as plpy |
| |
| m4_changequote(`<!', `!>') |
| |
| class AnyStringWith(str): |
| def __eq__(self, other): |
| return self in other |
| |
| class MiniBatchPreProcessingTestCase(unittest.TestCase): |
| def setUp(self): |
| self.plpy_mock = Mock(spec='error') |
| patches = { |
| 'plpy': plpy, |
| 'utilities.mean_std_dev_calculator': Mock(), |
| } |
| # we need to use MagicMock() instead of Mock() for the plpy.execute mock |
| # to be able to iterate on the return value |
| self.plpy_mock_execute = MagicMock() |
| plpy.execute = self.plpy_mock_execute |
| |
| self.module_patcher = patch.dict('sys.modules', patches) |
| self.module_patcher.start() |
| |
| self.default_schema_madlib = "madlib" |
| self.default_source_table = "source" |
| self.default_output_table = "output" |
| self.default_dep_var = "depvar" |
| self.default_ind_var = "indvar" |
| self.grouping_cols = None |
| self.default_buffer_size = 5 |
| |
| import utilities.minibatch_preprocessing |
| self.module = utilities.minibatch_preprocessing |
| self.module.validate_module_input_params = Mock() |
| self.output_tbl_valid_mock = Mock() |
| self.module.output_tbl_valid = self.output_tbl_valid_mock |
| |
| self.module.MiniBatchStandardizer = Mock() |
| self.module.get_expr_type = MagicMock(return_value="character") |
| self.module.get_distinct_col_levels = Mock(return_value = [0,22,100]) |
| self.subject = self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size) |
| |
| def tearDown(self): |
| self.module_patcher.stop() |
| |
| def test_minibatch_preprocessor_executes_query(self): |
| preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| "input", |
| "out", |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size) |
| preprocessor_obj.minibatch_preprocessor() |
| |
| def test_minibatch_preprocessor_null_buffer_size_executes_query(self): |
| preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| "input", |
| "out", |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| None) |
| self.module.MiniBatchBufferSizeCalculator.calculate_default_buffer_size = Mock() |
| preprocessor_obj.minibatch_preprocessor() |
| |
| def test_minibatch_preprocessor_multiple_dep_var_raises_exception(self): |
| with self.assertRaises(plpy.PLPYException): |
| self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| "y1,y2", |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size) |
| |
| def test_minibatch_preprocessor_buffer_size_zero_fails(self): |
| with self.assertRaises(plpy.PLPYException): |
| self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| 0) |
| |
| def test_minibatch_preprocessor_buffer_size_one_passes(self): |
| #not sure how to assert that an exception has not been raised |
| preprocessor_obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| 1) |
| preprocessor_obj.minibatch_preprocessor() |
| |
| def test_get_dep_var_array_expr_array_type(self): |
| self.module.get_expr_type = MagicMock(return_value="integer[]") |
| obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size) |
| dep_var_array_expr = obj.get_dep_var_array_expr() |
| |
| self.assertEqual(self.default_dep_var, dep_var_array_expr) |
| |
| def test_get_dep_var_array_expr_numeric_type(self): |
| self.module.get_expr_type = MagicMock(return_value="integer") |
| self.module.get_distinct_col_levels = Mock(return_value = [0,22,100]) |
| obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size, |
| one_hot_encode_int_dep_var=True) |
| dep_var_array_expr = obj.get_dep_var_array_expr() |
| self.assertEqual("array[({0}) = 0, ({0}) = 22, ({0}) = 100]::integer[]". |
| format(self.default_dep_var), |
| dep_var_array_expr.lower()) |
| |
| self.module.get_expr_type = MagicMock(return_value="numeric") |
| obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size, |
| one_hot_encode_int_dep_var=False) |
| dep_var_array_expr = obj.get_dep_var_array_expr() |
| self.assertEqual("array[({0})]".format(self.default_dep_var), |
| dep_var_array_expr.lower()) |
| |
| |
| def test_get_dep_var_array_expr_text_type(self): |
| self.module.get_expr_type = MagicMock(return_value="character") |
| self.module.get_distinct_col_levels = Mock(return_value = ["'a'","'b'","'c'"]) |
| obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size) |
| dep_var_array_expr = obj.get_dep_var_array_expr() |
| self.assertEqual("array[({0}) = 'a', ({0}) = 'b', ({0}) = 'c']::integer[]". |
| format(self.default_dep_var), |
| dep_var_array_expr.lower()) |
| |
| def test_get_dep_var_array_expr_bool_type(self): #TODO check for boolean types |
| self.module.get_expr_type = MagicMock(return_value="boolean") |
| self.module.get_distinct_col_levels = Mock(return_value = ['True', False]) |
| obj = self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size) |
| dep_var_array_expr = obj.get_dep_var_array_expr() |
| self.assertEqual("array[({0}) = True, ({0}) = False]::integer[]". |
| format(self.default_dep_var).lower(), |
| dep_var_array_expr.lower()) |
| |
| |
| def test_get_dep_var_array_expr_other_type(self): |
| self.module.get_expr_type = MagicMock(return_value="other") |
| with self.assertRaises(plpy.PLPYException): |
| self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size) |
| self.module.get_expr_type = MagicMock(return_value="other[]") |
| with self.assertRaises(plpy.PLPYException): |
| self.module.MiniBatchPreProcessor(self.default_schema_madlib, |
| self.default_source_table, |
| self.default_output_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| self.grouping_cols, |
| self.default_buffer_size) |
| |
| def test_get_indep_var_array_expr_passes(self): |
| ind_var_array_str = self.subject.get_indep_var_array_expr() |
| self.assertEqual("({0})::double precision[]".format(self.default_ind_var), ind_var_array_str.lower()) |
| |
| class MiniBatchQueryStandardizerTestCase(unittest.TestCase): |
| def setUp(self): |
| self.default_source_table = "source" |
| self.default_dep_var = "depvar" |
| self.default_ind_var = "indvar" |
| self.default_schema = "schema" |
| self.mean_std_calculator_mock = Mock() |
| patches = { |
| 'plpy': plpy, |
| 'utilities.mean_std_dev_calculator': self.mean_std_calculator_mock |
| } |
| self.x_mean = "5678" |
| self.x_std_dev = "4.789" |
| self.mean_std_calculator_mock.MeanStdDevCalculator.return_value.\ |
| get_mean_and_std_dev_for_ind_var = \ |
| Mock(return_value=(self.x_mean, self.x_std_dev)) |
| |
| # we need to use MagicMock() instead of Mock() for the plpy.execute mock |
| # to be able to iterate on the return value |
| self.plpy_mock_execute = MagicMock() |
| plpy.execute = self.plpy_mock_execute |
| |
| self.module_patcher = patch.dict('sys.modules', patches) |
| self.module_patcher.start() |
| |
| import utilities.minibatch_preprocessing |
| self.module = utilities.minibatch_preprocessing |
| self.subject = self.module.MiniBatchStandardizer(self.default_schema, |
| self.default_source_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| None, |
| "out_standardization") |
| |
| def tearDown(self): |
| self.module_patcher.stop() |
| |
| def test_get_query_for_standardizing_no_grouping(self): |
| self.subject.get_query_for_standardizing() |
| self.assertEqual(self.x_mean, self.subject.x_mean_str) |
| self.assertEqual(self.x_std_dev, self.subject.x_std_dev_str) |
| |
| def test_get_query_for_standardizing_grouping(self): |
| self.subject = self.module.MiniBatchStandardizer(self.default_schema, |
| self.default_source_table, |
| self.default_dep_var, |
| self.default_ind_var, |
| "grp", |
| "out_standardization") |
| self.subject.get_query_for_standardizing() |
| |
| def test_create_standardization_output_table_executes_query(self): |
| self.subject.create_output_standardization_table() |
| expected_query_substr_create_table = "CREATE TABLE out_standardization AS" |
| self.plpy_mock_execute.assert_called_with(AnyStringWith( |
| expected_query_substr_create_table)) |
| self.plpy_mock_execute.assert_called_with(AnyStringWith(self.x_mean)) |
| self.plpy_mock_execute.assert_called_with(AnyStringWith(self.x_std_dev)) |
| |
| class MiniBatchBufferSizeCalculatorTestCase(unittest.TestCase): |
| def setUp(self): |
| patches = { |
| 'plpy': plpy, |
| 'utilities.mean_std_dev_calculator': Mock() |
| } |
| self.a = 'a' |
| self.module_patcher = patch.dict('sys.modules', patches) |
| self.module_patcher.start() |
| import utilities.minibatch_preprocessing |
| self.module = utilities.minibatch_preprocessing |
| self.subject = self.module.MiniBatchBufferSizeCalculator |
| |
| def tearDown(self): |
| self.module_patcher.stop() |
| |
| def test_calculate_default_buffer_size_non_none_buffer_size(self): |
| buffer_size = self.subject.calculate_default_buffer_size(1, 3, 100) |
| self.assertTrue(isinstance(buffer_size, int)) |
| self.assertEqual(1, buffer_size) |
| |
| def test_calculate_default_buffer_size_none_buffer_size(self): |
| self.module.get_seg_number = Mock(return_value = 4) |
| buffer_size = self.subject.calculate_default_buffer_size(None, 100, 1000) |
| self.assertTrue(isinstance(buffer_size, int)) |
| self.assertEqual(25, buffer_size) |
| |
| def test_calculate_default_buffer_size_none_buffer_size_rounds_to_int(self): |
| self.module.get_seg_number = Mock(return_value = 5) |
| buffer_size = self.subject.calculate_default_buffer_size(None, 3, 1000) |
| self.assertTrue(isinstance(buffer_size, int)) |
| self.assertEqual(1, buffer_size) |
| |
| #TODO add more tests after finalizing the buffer size calculation |
| |
| if __name__ == '__main__': |
| unittest.main() |
| |
| # --------------------------------------------------------------------- |