| # coding=utf-8 |
| # |
| # Licensed to the Apache Software Foundation (ASF) under one |
| # or more contributor license agreements. See the NOTICE file |
| # distributed with this work for additional information |
| # regarding copyright ownership. The ASF licenses this file |
| # to you under the Apache License, Version 2.0 (the |
| # "License"); you may not use this file except in compliance |
| # with the License. You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, |
| # software distributed under the License is distributed on an |
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| # KIND, either express or implied. See the License for the |
| # specific language governing permissions and limitations |
| # under the License. |
| |
| import sys |
| from os import path |
| from itertools import repeat |
| |
| # Add modules module to the pythonpath. |
| sys.path.append(path.dirname(path.dirname(path.dirname(path.dirname(path.abspath(__file__)))))) |
| |
| import unittest |
| from mock import * |
| import plpy_mock as plpy |
| |
| m4_changequote(`<!', `!>') |
| |
| class VecColsHelperTestSuite(unittest.TestCase): |
| def setUp(self): |
| patches = { |
| 'plpy': plpy |
| } |
| self.plpy_mock_execute = MagicMock() |
| plpy.execute = self.plpy_mock_execute |
| |
| self.module_patcher = patch.dict('sys.modules', patches) |
| self.module_patcher.start() |
| |
| import utilities.transform_vec_cols |
| self.module = utilities.transform_vec_cols |
| self.subject = self.module.vec_cols_helper() |
| |
| self.default_source_table = "source" |
| self.does_not_matter = 'does_not_matter' |
| self.default_output_table = "output" |
| |
| def tearDown(self): |
| self.module_patcher.stop() |
| |
| def test_get_cols_as_list_null_cols_to_output(self): |
| cols_to_keep = self.subject.get_cols_as_list(None, self.does_not_matter, 'c') |
| self.assertEqual([], cols_to_keep) |
| |
| def test_get_cols_as_list_star_cols_to_output_without_others(self): |
| self.module.get_cols = Mock(return_value = ['a', 'b', 'c']) |
| cols_to_keep = self.subject.get_cols_as_list('*') |
| self.assertEqual([], cols_to_keep) |
| cols_to_keep = self.subject.get_cols_as_list('*', None, 'foo') |
| self.assertEqual([], cols_to_keep) |
| |
| def test_get_cols_as_list_star_cols_to_output_with_others(self): |
| self.module.get_cols = Mock(return_value = ['a', 'b', 'c']) |
| cols_to_keep = self.subject.get_cols_as_list('*', self.does_not_matter, 'c') |
| self.assertEqual(['a', 'b'], cols_to_keep) |
| |
| def test_get_cols_as_list_exclude_single_col(self): |
| cols_to_keep = self.subject.get_cols_as_list('a', self.does_not_matter, 'c') |
| self.assertEqual(['a'], cols_to_keep) |
| cols_to_keep = self.subject.get_cols_as_list('a, b', self.does_not_matter, 'c') |
| self.assertEqual(['a', 'b'], cols_to_keep) |
| cols_to_keep = self.subject.get_cols_as_list('a, b, c', self.does_not_matter, 'c') |
| self.assertEqual(['a', 'b'], cols_to_keep) |
| |
| def test_get_cols_as_list_exclude_comma_sep_col_list(self): |
| self.module.get_cols = Mock(return_value=['a', 'b', 'c', 'd']) |
| self.assertEqual(['c', 'd'], |
| self.subject.get_cols_as_list("*", self.does_not_matter, 'a, b')) |
| self.assertEqual(['a'], |
| self.subject.get_cols_as_list("a, b, c", self.does_not_matter, 'b, c')) |
| self.assertEqual(['a', 'b'], |
| self.subject.get_cols_as_list("a, b, c", self.does_not_matter, 'c, e')) |
| |
| def test_get_cols_as_list_all_cols_to_output(self): |
| self.module.get_cols = Mock(return_value = ['a', 'b', 'c']) |
| cols_to_keep = self.subject.get_cols_as_list('*', self.does_not_matter) |
| self.assertEqual(['a', 'b', 'c'], cols_to_keep) |
| |
| def test_get_cols_as_list_exclude_none_with_star(self): |
| self.module.get_cols = Mock(return_value=['a', 'b', 'c']) |
| self.assertEqual(['a', 'b', 'c'], |
| self.subject.get_cols_as_list("*", self.does_not_matter, None)) |
| self.assertEqual(['a', 'b', 'c'], |
| self.subject.get_cols_as_list("*", self.does_not_matter, "")) |
| |
| class Vec2ColsTestSuite(unittest.TestCase): |
| def setUp(self): |
| patches = { |
| 'plpy': plpy |
| } |
| self.plpy_mock_execute = MagicMock() |
| plpy.execute = self.plpy_mock_execute |
| |
| self.module_patcher = patch.dict('sys.modules', patches) |
| self.module_patcher.start() |
| |
| import utilities.transform_vec_cols |
| self.module = utilities.transform_vec_cols |
| self.subject = self.module.vec2cols() |
| |
| self.default_source_table = "source" |
| self.does_not_matter = 'does_not_matter' |
| self.default_output_table = "output" |
| |
| self.subject.input_tbl_valid = Mock() |
| self.subject.output_tbl_valid = Mock() |
| self.subject.cols_in_tbl_valid = Mock() |
| |
| def tearDown(self): |
| self.module_patcher.stop() |
| |
| def test_get_names_for_split_output_cols_feature_names_none(self): |
| self.plpy_mock_execute.return_value = [{"n_x": 3}] |
| new_cols = self.subject.get_names_for_split_output_cols(self.default_source_table, 'foobar') |
| self.assertEqual(['f1', 'f2', 'f3'], new_cols) |
| |
| def test_get_names_for_split_output_cols_array_upper_returns_none(self): |
| self.plpy_mock_execute.return_value = [{"n_x": None}] |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.get_names_for_split_output_cols(self.default_source_table, 'foobar') |
| |
| def test_validate_output_cols_max_cols_exception(self): |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.validate_output_cols(range(0, 1550), range(1550, 1650)) |
| |
| def test_validate_output_cols_duplicate_cols_exception(self): |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.validate_output_cols(['a'], ['a']) |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.validate_output_cols(['a', 'b', 'c'], ['a', 'b']) |
| |
| def test_validate_output_cols_passes(self): |
| self.subject.validate_output_cols(range(0, 1450), range(1450, 1500)) |
| self.subject.validate_output_cols(['a', 'b', 'c'], ['d', 'e']) |
| |
| class Cols2VecTestSuite(unittest.TestCase): |
| def setUp(self): |
| patches = { |
| 'plpy': plpy |
| } |
| self.plpy_mock_execute = MagicMock() |
| plpy.execute = self.plpy_mock_execute |
| |
| self.module_patcher = patch.dict('sys.modules', patches) |
| self.module_patcher.start() |
| |
| import utilities.transform_vec_cols |
| self.module = utilities.transform_vec_cols |
| self.subject = self.module.cols2vec() |
| |
| self.default_source_table = "source" |
| self.does_not_matter = 'does_not_matter' |
| self.default_output_table = "output" |
| |
| self.module.input_tbl_valid = Mock() |
| self.module.output_tbl_valid = Mock() |
| self.module.cols_in_tbl_valid = Mock() |
| |
| def tearDown(self): |
| self.module_patcher.stop() |
| |
| def test_validate_args_null_list_of_features_exception(self): |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.validate_args(self.does_not_matter, |
| self.does_not_matter, |
| None, |
| self.does_not_matter, |
| self.does_not_matter) |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.validate_args(self.does_not_matter, |
| self.does_not_matter, |
| " ", |
| self.does_not_matter, |
| self.does_not_matter) |
| |
| def test_validate_args_star_features_to_exclude_exception(self): |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.validate_args(self.does_not_matter, |
| self.does_not_matter, |
| self.does_not_matter, |
| ' * ', |
| self.does_not_matter) |
| |
| def test_get_and_validate_feature_types_with_array_list_of_features_exception(self): |
| self.module.get_cols_and_types = Mock(return_value=list(zip(['a', 'b'], ['s1', 's1[]']))) |
| self.subject.features_to_nest = ['a','b'] |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.get_and_validate_feature_types(self.does_not_matter) |
| self.plpy_mock_execute.return_value = Mock(return_value=list(zip(['a', 'b'], ['s1[]', 's1[]']))) |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.get_and_validate_feature_types(self.does_not_matter) |
| self.plpy_mock_execute.return_value = Mock(return_value=list(zip(['a', 'b'], ['s1[]', 's1']))) |
| with self.assertRaises(plpy.PLPYException): |
| self.subject.get_and_validate_feature_types(self.does_not_matter) |
| |
| def test_get_and_validate_feature_types_nonstar_with_scalar_list_of_features(self): |
| self.module.get_cols_and_types = Mock(return_value=list(zip(['a', 'b'], ['s1', 's1']))) |
| self.subject.features_to_nest = ['a','b'] |
| self.subject.get_and_validate_feature_types(self.does_not_matter) |
| |
| if __name__ == '__main__': |
| unittest.main() |