blob: 268d7e968a6dd6eca5ebaf2a34ad9a8f2c536a11 [file] [log] [blame]
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import StringIO
import sys
from unittest import TestCase
from mock.mock import patch, MagicMock
from only_for_platform import not_for_platform, PLATFORM_WINDOWS
from ambari_commons.os_check import OSCheck, OSConst
utils = __import__('ambari_server.utils').utils
@not_for_platform(PLATFORM_WINDOWS)
class TestUtils(TestCase):
@patch.object(OSCheck, "get_os_family")
@patch('os.listdir')
@patch('os.path.isdir')
def test_get_ubuntu_pg_version(self, path_isdir_mock, os_listdir_mock, get_os_family_mock):
get_os_family_mock.return_value = OSConst.UBUNTU_FAMILY
path_isdir_mock.return_value = True
os_listdir_mock.return_value = ['8.4', '9.1']
self.assertEqual('9.1', utils.get_ubuntu_pg_version())
@patch.object(OSCheck, "is_suse_family")
@patch.object(OSCheck, "is_ubuntu_family")
@patch.object(OSCheck, "is_redhat_family")
@patch('ambari_server.utils.get_ubuntu_pg_version')
def test_get_postgre_running_status(self, get_ubuntu_pg_version_mock, is_redhat_family, is_ubuntu_family, is_suse_family):
is_redhat_family.return_value = False
is_ubuntu_family.return_value = True
is_suse_family.return_value = False
utils.PG_STATUS_RUNNING_DEFAULT = "red_running"
get_ubuntu_pg_version_mock.return_value = '9.1'
self.assertEqual('9.1/main', utils.get_postgre_running_status())
is_redhat_family.return_value = True
is_ubuntu_family.return_value = False
is_suse_family.return_value = False
self.assertEqual('red_running', utils.get_postgre_running_status())
@patch('os.path.isfile')
def test_locate_file(self, isfile_mock):
utils.ENV_PATH = ['/test']
# File was found in the path
isfile_mock.return_value = True
self.assertEquals('/test/myfile', utils.locate_file('myfile'))
# File not found in the path
isfile_mock.return_value = False
self.assertEquals('myfile', utils.locate_file('myfile'))
# Testing default vaule
isfile_mock.return_value = False
self.assertEquals('/tmp/myfile', utils.locate_file('myfile', '/tmp'))
@patch('os.path.exists')
@patch('os.path.join')
def test_pid_exists(self, path_join_mock, path_exists_mock):
path_join_mock.return_value = '/test'
path_exists_mock.return_value = True
self.assertTrue(utils.pid_exists('1'))
@patch('time.time')
@patch('__builtin__.open')
@patch('time.sleep')
@patch('os.listdir')
@patch('os.path.join')
@patch.object(utils, 'get_symlink_path')
def test_looking_for_pid(self, get_symlink_path_mock, path_join_mock,
listdir_mock, sleep_mock, open_mock, time_mock):
def test_read():
return "test args"
def test_obj():
pass
test_obj.read = test_read
path_join_mock.return_value = '/'
open_mock.return_value = test_obj
listdir_mock.return_value = ['1000']
get_symlink_path_mock.return_value = "/symlinkpath"
time_mock.side_effect = [0, 0, 0, 0, 0, 0, 6]
out = StringIO.StringIO()
sys.stdout = out
r = utils.looking_for_pid("test args", 5)
self.assertEqual(".....", out.getvalue())
sys.stdout = sys.__stdout__
self.assertEquals(len(r), 1)
self.assertEquals(r[0], {
"pid": "1000",
"exe": "/symlinkpath",
"cmd": "test args"
})
@patch('os.path.normpath')
@patch('os.path.join')
@patch('os.path.dirname')
@patch('os.readlink')
def test_get_symlink_path(self, readlink_mock, dirname_mock, join_mock,
normpath_mock):
normpath_mock.return_value = "test value"
self.assertEquals(utils.get_symlink_path("/"), "test value")
@patch.object(utils, 'pid_exists')
@patch('__builtin__.open')
@patch('os.kill')
def test_save_main_pid_ex(self, kill_mock, open_mock, pid_exists_mock):
def test_write(data):
self.assertEquals(data, "222\n")
def test_close():
pass
def test_obj():
pass
test_obj.write = test_write
test_obj.close = test_close
open_mock.return_value = test_obj
pid_exists_mock.return_value = True
utils.save_main_pid_ex([{"pid": "111",
"exe": "/exe1",
"cmd": ""
},
{"pid": "222",
"exe": "/exe2",
"cmd": ""
},
], "/pidfile", ["/exe1"])
self.assertEquals(open_mock.call_count, 1)
self.assertEquals(pid_exists_mock.call_count, 4)
self.assertEquals(kill_mock.call_count, 1)
@patch('os.path.isfile')
@patch('__builtin__.open')
@patch('os.remove')
def test_check_exitcode(self, remove_mock, open_mock, isfile_mock):
def test_read():
return "777"
def test_close():
pass
def test_obj():
pass
test_obj.read = test_read
test_obj.close = test_close
open_mock.return_value = test_obj
isfile_mock.return_value = True
self.assertEquals(utils.check_exitcode("/tmp/nofile"), 777)
def test_format_with_reload(self):
from resource_management.libraries.functions import format
from resource_management.libraries.functions.format import ConfigurationFormatter
from resource_management.core.environment import Environment
env = Environment()
with env:
# declare some environment variables
env_params = {}
env_params["envfoo"] = "env-foo1"
env_params["envbar"] = "env-bar1"
env.config.params = env_params
# declare some local variables
foo = "foo1"
bar = "bar1"
# make sure local variables and env variables work
message = "{foo} {bar} {envfoo} {envbar}"
formatted_message = format(message)
self.assertEquals("foo1 bar1 env-foo1 env-bar1", formatted_message)
# try the same thing with an instance; we pass in keyword args to be
# combined with the env params
formatter = ConfigurationFormatter()
formatted_message = formatter.format(message, foo="foo2", bar="bar2")
self.assertEquals("foo2 bar2 env-foo1 env-bar1", formatted_message)
# now supply keyword args to override env params
formatted_message = formatter.format(message, envfoo="foobar", envbar="foobarbaz", foo="foo3", bar="bar3")
self.assertEquals("foo3 bar3 foobar foobarbaz", formatted_message)
def test_compare_versions(self):
self.assertEquals(utils.compare_versions("1.7.0", "2.0.0"), -1)
self.assertEquals(utils.compare_versions("2.0.0", "2.0.0"), 0)
self.assertEquals(utils.compare_versions("2.1.0", "2.0.0"), 1)
self.assertEquals(utils.compare_versions("1.7.0_abc", "2.0.0-abc"), -1)
self.assertEquals(utils.compare_versions("2.0.0.abc", "2.0.0_abc"), 0)
self.assertEquals(utils.compare_versions("2.1.0-abc", "2.0.0.abc"), 1)
self.assertEquals(utils.compare_versions("2.1.0-1","2.0.0-2"),1)
self.assertEquals(utils.compare_versions("2.0.0_1","2.0.0-2"),0)
self.assertEquals(utils.compare_versions("2.0.0-1","2.0.0-2"),0)
self.assertEquals(utils.compare_versions("2.0.0_1","2.0.0_2"),0)
self.assertEquals(utils.compare_versions("2.0.0-abc","2.0.0_abc"),0)
class FakeProperties(object):
def __init__(self, prop_map):
self.prop_map = prop_map
def get_property(self, prop_name):
return self.prop_map[prop_name]