blob: b44b45499d95d5ae160623ad09c88236bdb24d2d [file] [log] [blame]
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
from mock.mock import patch, MagicMock, Mock
from unittest import TestCase
from resource_management.libraries.functions.security_commons import *
from datetime import datetime, timedelta
from tempfile import gettempdir
import os
class TestSecurityCommons(TestCase):
@patch('os.path.isfile')
def test_validate_security_config_properties(self, os_path_isfile_mock):
# Testing with correct values_checks
params = {}
params["config_file"] = {}
params["config_file"]["property1"] = ["firstCase"]
params["config_file"]["property2"] = ["secondCase"]
params["config_file"]["property3"] = ["thirdCase"]
params["config_file"]["property4"] = ["fourthCase"]
params["config_file"]["property5"] = ["fifthCase"]
params["config_file"]["property6"] = ["sixthCase"]
configuration_rules = {}
configuration_rules["config_file"] = {}
configuration_rules["config_file"]["value_checks"] = {}
configuration_rules["config_file"]["value_checks"]["property1"] = ["firstCase"]
configuration_rules["config_file"]["value_checks"]["property2"] = ["secondCase"]
self.assertEquals(not validate_security_config_properties(params, configuration_rules),
True) # issues is empty
#Testing with correct empty_checks
configuration_rules["config_file"]["empty_checks"] = ["property3", "property4"]
self.assertEquals(not validate_security_config_properties(params, configuration_rules),
True) # issues is empty
# Testing with correct read_checks
configuration_rules["config_file"]["read_checks"] = ["property5", "property6"]
os_path_isfile_mock.return_value = True
self.assertEquals(not validate_security_config_properties(params, configuration_rules),
True) # issues is empty
# Testing with wrong values_checks
configuration_rules["config_file"]["value_checks"]["property1"] = ["failCase"]
configuration_rules["config_file"]["value_checks"]["property2"] = ["failCase2"]
self.assertEquals(not validate_security_config_properties(params, configuration_rules),
False) # Doesn't return an empty issues
configuration_rules["config_file"]["value_checks"]["property1"] = ["firstCase"]
configuration_rules["config_file"]["value_checks"]["property2"] = ["secondCase"]
# Testing with a property which doesn't exist in params
configuration_rules["config_file"]["empty_checks"].append("property7")
self.assertEquals(not validate_security_config_properties(params, configuration_rules),
False) # Doesn't return an empty list
configuration_rules["config_file"]["empty_checks"].remove("property7")
# Testing with a property which doesn't exist in params
configuration_rules["config_file"]["read_checks"].append("property8")
self.assertEquals(not validate_security_config_properties(params, configuration_rules),
False) # Doesn't return an empty list
configuration_rules["config_file"]["read_checks"].remove("property8")
#Testing empty_checks and read_checks with an empty params[config_file][property]
params["config_file"]["property1"] = [""]
params["config_file"]["property2"] = [""]
params["config_file"]["property3"] = [""]
params["config_file"]["property4"] = [""]
params["config_file"]["property5"] = [""]
params["config_file"]["property6"] = [""]
self.assertEquals(not validate_security_config_properties(params, configuration_rules),
False) # Doesn't return an empty list
def test_build_expectations(self):
config_file = 'config_file'
value_checks = {}
value_checks["property1"] = ["value1"]
value_checks["property2"] = ["value2"]
empty_checks = ["property3", "property4"]
read_checks = ["property5", "property6"]
result = build_expectations(config_file, value_checks, empty_checks, read_checks)
self.assertEquals(len(result[config_file]['value_checks']), len(value_checks))
self.assertEquals(len(result[config_file]['empty_checks']), len(empty_checks))
self.assertEquals(len(result[config_file]['read_checks']), len(read_checks))
# Testing that returns empty dict if is called without values
result = build_expectations(config_file, [], [], [])
self.assertEquals(not result[config_file].items(), True)
def test_get_params_from_filesystem_JAAS(self):
conf_dir = gettempdir()
jaas_file = "test_jaas.conf"
jaas_file_path = conf_dir + os.sep + jaas_file
# Create temporary test file (mocking a files for reading isn't available for the current version
# of the library
with open(jaas_file_path, "w+") as f:
f.write('Client {\n'
' com.sun.security.auth.module.Krb5LoginModule required\n'
' useKeyTab=true\n'
' storeKey=true\n'
' useTicketCache=false\n'
' keyTab="/etc/security/keytabs/hbase.service.keytab"\n'
' principal="hbase/vp-ambari-ranger-med-0120-2.cs1cloud.internal@EXAMPLE.COM";\n'
'};\n')
config_file = {
jaas_file : FILE_TYPE_JAAS_CONF
}
result = get_params_from_filesystem(conf_dir, config_file)
expected = {
'test_jaas': {
'Client': {
'keyTab': '/etc/security/keytabs/hbase.service.keytab',
'useTicketCache': 'false',
'storeKey': 'true',
'com.sun.security.auth.module.Krb5LoginModule': 'required',
'useKeyTab': 'true',
'principal': 'hbase/vp-ambari-ranger-med-0120-2.cs1cloud.internal@EXAMPLE.COM'
}
}
}
self.assertEquals(expected, result)
os.unlink(jaas_file_path)
print result
@patch('xml.etree.ElementTree.parse')
def test_get_params_from_filesystem(self, et_parser_mock):
conf_dir = gettempdir()
config_file = {
"config.xml": FILE_TYPE_XML
}
prop1_name_mock = MagicMock()
prop1_name_mock.text.return_value = 'property1'
prop1_value_mock = MagicMock()
prop1_value_mock.text.return_value = 'true'
prop2_name_mock = MagicMock()
prop2_name_mock.text.return_value = 'property2'
prop2_value_mock = MagicMock()
prop2_value_mock.text.return_value = 'false'
prop3_name_mock = MagicMock()
prop3_name_mock.text.return_value = 'property3'
prop3_value_mock = MagicMock()
prop3_value_mock.text.return_value = 'true'
props = []
props.append([prop1_name_mock, prop1_value_mock])
props.append([prop2_name_mock, prop2_value_mock])
props.append([prop3_name_mock, prop3_value_mock])
element_tree_mock = MagicMock()
et_parser_mock.return_value = element_tree_mock
get_root_mock = MagicMock()
element_tree_mock.getroot.return_value = get_root_mock
get_root_mock.getchildren.return_value = props
result = get_params_from_filesystem(conf_dir, config_file)
# Testing that the mock is called with the correct path
et_parser_mock.assert_called_with(conf_dir + os.sep + "config.xml")
#Testing that the dictionary and the list from the result are not empty
self.assertEquals(not result, False)
self.assertEquals(not result[result.keys()[0]], False)
#Testing that returns an empty dictionary if is called with no props
empty_props = []
get_root_mock.getchildren.return_value = empty_props
result = get_params_from_filesystem(conf_dir, config_file)
self.assertEquals(not result, False)
self.assertEquals(not result['config'].items(), True)
#Testing that returns an empty dictionary if is called with empty config_files
empty_config_file = {}
result = get_params_from_filesystem(conf_dir, empty_config_file)
self.assertEquals(not result, True)
#Test that params returns an exception
et_parser_mock.reset_mock()
et_parser_mock.side_effect = Exception("Invalid path")
try:
get_params_from_filesystem(conf_dir, config_file)
except:
self.assertTrue(True)
@patch('os.path.exists')
@patch('os.makedirs')
@patch('os.path.isfile')
@patch('ambari_simplejson.load')
@patch('resource_management.libraries.functions.security_commons.new_cached_exec')
@patch('__builtin__.open')
def test_cached_executor(self, open_file_mock, new_cached_exec_mock, ambari_simplejson_load_mock,
os_isfile_mock, os_makedirs_mock, os_path_exists_mock):
# Test that function works when is called with correct parameters
temp_dir = gettempdir()
kinit_path = "kinit"
user = "user"
hostname ="hostnamne"
keytab_file ="/etc/security/keytabs/nn.service.keytab"
principal = "nn/c6405.ambari.apache.org@EXAMPLE.COM"
key = str(hash("%s|%s" % (principal, keytab_file)))
expiration_time = 30
filename = key + "_tmp.txt"
file_path = temp_dir + os.sep + "kinit_executor_cache"
os_path_exists_mock.return_value = True
os_isfile_mock.return_value = True
output = {}
output[key] = {}
output[key] = {"last_successful_execution": str(datetime.now())}
ambari_simplejson_load_mock.return_value = output
cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time)
os_path_exists_mock.assert_called_with(file_path)
os_isfile_mock.assert_called_with(file_path + os.sep + filename)
open_file_mock.assert_called_with(file_path + os.sep + filename, 'r')
# Test that the new_cached_exec function is called if the time spend since the last call is greater than 30 minutes
last_successful_executation = datetime.now()
last_successful_executation = last_successful_executation - timedelta(minutes=31)
output_error = {}
output_error[key] = {}
output_error[key] = {"last_successful_execution": str(last_successful_executation)}
ambari_simplejson_load_mock.reset_mock()
ambari_simplejson_load_mock.return_value = output_error
new_cached_exec_mock.return_value = output
cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time)
self.assertTrue(new_cached_exec_mock.called)
new_cached_exec_mock.assert_called_with(key, file_path + os.sep + filename, kinit_path, temp_dir, user, keytab_file, principal, hostname)
# Test that the makedirs function is called with correct path when the directory doesn't exist
os_path_exists_mock.return_value = False
cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time)
os_makedirs_mock.assert_called_with(file_path)
# Test that the ambari_simplejson throws an exception
os_path_exists_mock.return_value = True
ambari_simplejson_load_mock.reset_mock()
ambari_simplejson_load_mock.side_effect = Exception("Invalid file")
try:
cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time)
except:
self.assertTrue(True)
# Test that the new_cached_exec function is called if the output doesn't have data
ambari_simplejson_load_mock.reset_mock()
ambari_simplejson_load_mock.return_value = None
new_cached_exec_mock.return_value = output
cached_kinit_executor(kinit_path, user, keytab_file, principal, hostname, temp_dir, expiration_time)
self.assertTrue(new_cached_exec_mock.called)
new_cached_exec_mock.assert_called_with(key, file_path + os.sep + filename, kinit_path, temp_dir, user, keytab_file, principal, hostname)