blob: 2d4205b43cfdcc0e5d442a65c4fcd04820942424 [file] [log] [blame]
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import os
import platform
import sys
import unittest
import StringIO
from mock.mock import patch, MagicMock
from only_for_platform import os_distro_value
from ambari_commons import os_utils
from urllib2 import HTTPError
import shutil
# Mock classes for reading from a file
class MagicFile(object):
def __init__(self, data):
self.data = data
def read(self):
return self.data
def __exit__(self, exc_type, exc_val, exc_tb):
pass
def __enter__(self):
return self
pass
project_dir = os.path.join(os.path.abspath(os.path.dirname(__file__)),os.path.normpath("../../../../"))
shutil.copyfile(project_dir+"/ambari-server/conf/unix/ambari.properties", "/tmp/ambari.properties")
# We have to use this import HACK because the filename contains a dash
_search_file = os_utils.search_file
def search_file_proxy(filename, searchpatch, pathsep=os.pathsep):
global _search_file
if "ambari.properties" in filename:
return "/tmp/ambari.properties"
return _search_file(filename, searchpatch, pathsep)
os_utils.search_file = search_file_proxy
with patch.object(platform, "linux_distribution", return_value = MagicMock(return_value=('Redhat', '7.4', 'Final'))):
with patch("os.path.isdir", return_value = MagicMock(return_value=True)):
with patch("os.access", return_value = MagicMock(return_value=True)):
with patch.object(os_utils, "parse_log4j_file", return_value={'ambari.log.dir': '/var/log/ambari-server'}):
with patch("platform.linux_distribution", return_value = os_distro_value):
with patch("os.symlink"):
with patch.object(os_utils, "is_service_exist", return_value = True):
with patch("glob.glob", return_value = ['/etc/init.d/postgresql-9.3']):
_ambari_server_ = __import__('ambari-server')
with patch("__builtin__.open"):
from ambari_commons.exceptions import FatalException, NonFatalException
from ambari_server.properties import Properties
from ambari_server.setupTrustedProxy import setup_trusted_proxy, TPROXY_SUPPORT_ENABLED, PROXYUSER_HOSTS, PROXYUSER_USERS, PROXYUSER_GROUPS
class TestSetupTrustedProxy(unittest.TestCase):
@patch("ambari_server.setupTrustedProxy.is_server_runing")
def test_tproxy_setup_should_fail_if_server_is_not_running(self, is_server_runing_mock):
out = StringIO.StringIO()
sys.stdout = out
is_server_runing_mock.return_value = (False, 0)
options = self._create_empty_options_mock()
try:
setup_trusted_proxy(options)
self.fail("Should fail with non-fatal exception")
except FatalException as e:
self.assertTrue("Ambari Server is not running" in e.reason)
pass
sys.stdout = sys.__stdout__
pass
@patch("ambari_server.setupTrustedProxy.get_silent")
@patch("ambari_server.setupTrustedProxy.is_server_runing")
def test_silent_mode_is_not_allowed(self, is_server_runing_mock, get_silent_mock):
out = StringIO.StringIO()
sys.stdout = out
is_server_runing_mock.return_value = (True, 0)
get_silent_mock.return_value = True
options = self._create_empty_options_mock()
try:
setup_trusted_proxy(options)
self.fail("Should fail with fatal exception")
except NonFatalException as e:
self.assertTrue("setup-trusted-proxy is not enabled in silent mode." in e.reason)
pass
sys.stdout = sys.__stdout__
pass
@patch("ambari_server.setupTrustedProxy.get_silent")
@patch("ambari_server.setupTrustedProxy.is_server_runing")
def test_invalid_tproxy_enabled_cli_option_should_result_in_error(self, is_server_runing_mock, get_silent_mock):
out = StringIO.StringIO()
sys.stdout = out
is_server_runing_mock.return_value = (True, 0)
get_silent_mock.return_value = False
options = self._create_empty_options_mock()
options.tproxy_enabled = 'not_true_or_false'
try:
setup_trusted_proxy(options)
self.fail("Should fail with fatal exception")
except FatalException as e:
self.assertTrue("--tproxy-enabled should be to either 'true' or 'false'" in e.reason)
pass
sys.stdout = sys.__stdout__
pass
@patch("ambari_server.setupTrustedProxy.perform_changes_via_rest_api")
@patch("ambari_server.setupTrustedProxy.get_YN_input")
@patch("ambari_server.setupTrustedProxy.get_validated_string_input")
@patch("ambari_server.setupTrustedProxy.get_ambari_properties")
@patch("ambari_server.setupTrustedProxy.get_silent")
@patch("ambari_server.setupTrustedProxy.is_server_runing")
@patch("ambari_server.setupTrustedProxy.get_json_via_rest_api")
def test_tproxy_is_enabled_for_two_proxy_users(self, get_json_via_rest_api_mock, is_server_runing_mock, get_silent_mock,
get_ambari_properties_mock, get_validated_string_input_mock, get_YN_input_mock, perform_changes_via_rest_api_mock):
out = StringIO.StringIO()
sys.stdout = out
get_json_via_rest_api_mock.return_value = (200, {})
is_server_runing_mock.return_value = (True, 0)
get_silent_mock.return_value = False
get_ambari_properties_mock.return_value = Properties()
user_name1 = 'knox'
hosts1 = 'knox_hosts'
users1 = 'knox_users'
groups1 = 'knox_groups'
user_name2 = 'admin'
hosts2 = 'admin_hosts'
users2 = 'admin_users'
groups2 = 'admin_groups'
get_validated_string_input_mock.side_effect = [user_name1, hosts1, users1, groups1, user_name2, hosts2, users2, groups2]
get_YN_input_mock.side_effect = [True, False] #answer 'True' for the first time when asking for a new proxy user addition and then 'False' (indicating we do not want to add more proxy users)
options = self._create_empty_options_mock()
options.tproxy_enabled = 'true'
setup_trusted_proxy(options)
self.assertTrue(perform_changes_via_rest_api_mock.called)
requestCall = perform_changes_via_rest_api_mock.call_args_list[0]
args, kwargs = requestCall
requestData = args[5]
self.assertTrue(isinstance(requestData, dict))
tproxyProperties = requestData['Configuration']['properties']
self.assertEqual(tproxyProperties[TPROXY_SUPPORT_ENABLED], 'true')
self.assertEqual(tproxyProperties[PROXYUSER_HOSTS.format(user_name1)], hosts1)
self.assertEqual(tproxyProperties[PROXYUSER_USERS.format(user_name1)], users1)
self.assertEqual(tproxyProperties[PROXYUSER_GROUPS.format(user_name1)], groups1)
self.assertEqual(tproxyProperties[PROXYUSER_HOSTS.format(user_name2)], hosts2)
self.assertEqual(tproxyProperties[PROXYUSER_USERS.format(user_name2)], users2)
self.assertEqual(tproxyProperties[PROXYUSER_GROUPS.format(user_name2)], groups2)
sys.stdout = sys.__stdout__
pass
@patch("ambari_server.setupTrustedProxy.perform_changes_via_rest_api")
@patch("ambari_server.setupTrustedProxy.get_ambari_properties")
@patch("ambari_server.setupTrustedProxy.get_silent")
@patch("ambari_server.setupTrustedProxy.is_server_runing")
@patch("ambari_server.setupTrustedProxy.get_json_via_rest_api")
def test_disabling_tproxy_support(self, get_json_via_rest_api_mock, is_server_runing_mock, get_silent_mock, get_ambari_properties_mock, perform_changes_via_rest_api_mock):
out = StringIO.StringIO()
sys.stdout = out
get_json_via_rest_api_mock.return_value = (200, {})
is_server_runing_mock.return_value = (True, 0)
get_silent_mock.return_value = False
properties = Properties()
get_ambari_properties_mock.return_value = properties
options = self._create_empty_options_mock()
options.tproxy_enabled = 'false'
setup_trusted_proxy(options)
self.assertTrue(perform_changes_via_rest_api_mock.called)
requestCall = perform_changes_via_rest_api_mock.call_args_list[0]
args, kwargs = requestCall
requestMethod = args[4]
self.assertTrue(isinstance(requestMethod, str))
self.assertEqual(requestMethod, "DELETE")
sys.stdout = sys.__stdout__
pass
@patch("ambari_server.setupTrustedProxy.get_silent")
@patch("ambari_server.setupTrustedProxy.is_server_runing")
@patch("os.path.isfile")
def test_enable_tproxy_support_using_configuration_file_path_from_command_line_should_fail_if_file_does_not_exist(self, isfile_mock, is_server_runing_mock, get_silent_mock):
out = StringIO.StringIO()
sys.stdout = out
is_server_runing_mock.return_value = (True, 0)
get_silent_mock.return_value = False
isfile_mock.return_value = False
options = self._create_empty_options_mock()
options.tproxy_enabled = 'true'
options.tproxy_configuration_file_path = 'samplePath'
try:
setup_trusted_proxy(options)
self.fail("Should fail with fatal exception")
except FatalException as e:
self.assertTrue("--tproxy-configuration-file-path is set to a non-existing file" in e.reason)
pass
sys.stdout = sys.__stdout__
pass
@patch("ambari_server.setupTrustedProxy.perform_changes_via_rest_api")
@patch("ambari_server.setupTrustedProxy.get_ambari_properties")
@patch("ambari_server.setupTrustedProxy.get_silent")
@patch("ambari_server.setupTrustedProxy.is_server_runing")
@patch("ambari_server.setupTrustedProxy.get_json_via_rest_api")
@patch("os.path.isfile")
@patch('__builtin__.open')
def test_enable_tproxy_support_using_configuration_file_path_from_command_line(self, open_mock, isfile_mock, get_json_via_rest_api_mock, is_server_runing_mock, get_silent_mock, get_ambari_properties_mock, perform_changes_via_rest_api_mock):
out = StringIO.StringIO()
sys.stdout = out
get_json_via_rest_api_mock.return_value = (200, {})
is_server_runing_mock.return_value = (True, 0)
get_silent_mock.return_value = False
properties = Properties()
get_ambari_properties_mock.return_value = properties
isfile_mock.return_value = True
tproxy_configurations = "["\
" {"\
" \"proxyuser\" : \"knox\"," \
" \"hosts\" : \"host1\"," \
" \"users\" : \"user1\"," \
" \"groups\" : \"group1\"" \
" }," \
" {"\
" \"proxyuser\": \"admin\"," \
" \"hosts\" : \"host2\"," \
" \"users\" : \"user2\"," \
" \"groups\" : \"group2\"" \
" }" \
"]"
mock_file = MagicFile(tproxy_configurations)
open_mock.side_effect = [mock_file]
options = self._create_empty_options_mock()
options.tproxy_enabled = 'true'
options.tproxy_configuration_file_path = 'samplePath'
setup_trusted_proxy(options)
self.assertTrue(perform_changes_via_rest_api_mock.called)
requestCall = perform_changes_via_rest_api_mock.call_args_list[0]
args, kwargs = requestCall
requestData = args[5]
self.assertTrue(isinstance(requestData, dict))
tproxyProperties = requestData['Configuration']['properties']
self.assertEqual(tproxyProperties[TPROXY_SUPPORT_ENABLED], 'true')
user_name1="knox"
self.assertEqual(tproxyProperties[PROXYUSER_HOSTS.format(user_name1)], "host1")
self.assertEqual(tproxyProperties[PROXYUSER_USERS.format(user_name1)], "user1")
self.assertEqual(tproxyProperties[PROXYUSER_GROUPS.format(user_name1)], "group1")
user_name2="admin"
self.assertEqual(tproxyProperties[PROXYUSER_HOSTS.format(user_name2)], "host2")
self.assertEqual(tproxyProperties[PROXYUSER_USERS.format(user_name2)], "user2")
self.assertEqual(tproxyProperties[PROXYUSER_GROUPS.format(user_name2)], "group2")
sys.stdout = sys.__stdout__
pass
def _create_empty_options_mock(self):
options = MagicMock()
options.tproxy_enabled = None
options.tproxy_configuration_file_path = None
return options