| #!/usr/bin/env python |
| |
| ''' |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ''' |
| import ConfigParser |
| from multiprocessing.pool import ThreadPool |
| import os |
| |
| import pprint |
| from ambari_commons import shell |
| |
| from unittest import TestCase |
| import threading |
| import tempfile |
| import time |
| import traceback |
| from threading import Thread |
| |
| from mock.mock import MagicMock, patch |
| import StringIO |
| import sys |
| |
| from ambari_agent.ActionQueue import ActionQueue |
| from ambari_agent.AgentException import AgentException |
| from ambari_agent.AmbariConfig import AmbariConfig |
| from ambari_agent.BackgroundCommandExecutionHandle import BackgroundCommandExecutionHandle |
| from ambari_agent.CustomServiceOrchestrator import CustomServiceOrchestrator |
| from ambari_agent.FileCache import FileCache |
| from ambari_agent.PythonExecutor import PythonExecutor |
| from ambari_commons import OSCheck |
| from only_for_platform import get_platform, os_distro_value, PLATFORM_WINDOWS |
| |
| class TestCustomServiceOrchestrator(TestCase): |
| |
| |
| def setUp(self): |
| # disable stdout |
| out = StringIO.StringIO() |
| sys.stdout = out |
| # generate sample config |
| tmpdir = tempfile.gettempdir() |
| exec_tmp_dir = os.path.join(tmpdir, 'tmp') |
| self.config = AmbariConfig() |
| self.config.config = ConfigParser.RawConfigParser() |
| self.config.add_section('agent') |
| self.config.set('agent', 'prefix', tmpdir) |
| self.config.set('agent', 'cache_dir', "/cachedir") |
| self.config.add_section('python') |
| self.config.set('python', 'custom_actions_dir', tmpdir) |
| |
| |
| @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) |
| @patch.object(FileCache, "__init__") |
| def test_add_reg_listener_to_controller(self, FileCache_mock): |
| FileCache_mock.return_value = None |
| dummy_controller = MagicMock() |
| config = AmbariConfig() |
| tempdir = tempfile.gettempdir() |
| config.set('agent', 'prefix', tempdir) |
| CustomServiceOrchestrator(config, dummy_controller) |
| self.assertTrue(dummy_controller.registration_listeners.append.called) |
| |
| |
| @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) |
| @patch.object(CustomServiceOrchestrator, 'decompressClusterHostInfo') |
| @patch("ambari_agent.hostname.public_hostname") |
| @patch("os.path.isfile") |
| @patch("os.unlink") |
| @patch.object(FileCache, "__init__") |
| def test_dump_command_to_json(self, FileCache_mock, unlink_mock, |
| isfile_mock, hostname_mock, |
| decompress_cluster_host_info_mock): |
| FileCache_mock.return_value = None |
| hostname_mock.return_value = "test.hst" |
| command = { |
| 'commandType': 'EXECUTION_COMMAND', |
| 'role': u'DATANODE', |
| 'roleCommand': u'INSTALL', |
| 'commandId': '1-1', |
| 'taskId': 3, |
| 'clusterName': u'cc', |
| 'serviceName': u'HDFS', |
| 'configurations':{'global' : {}}, |
| 'configurationTags':{'global' : { 'tag': 'v1' }}, |
| 'clusterHostInfo':{'namenode_host' : ['1'], |
| 'slave_hosts' : ['0', '1'], |
| 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], |
| 'all_ping_ports': ['8670:0,1']}, |
| 'hostLevelParams':{} |
| } |
| |
| decompress_cluster_host_info_mock.return_value = {'namenode_host' : ['h2.hortonworks.com'], |
| 'slave_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], |
| 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], |
| 'all_ping_ports': ['8670', '8670']} |
| |
| config = AmbariConfig() |
| tempdir = tempfile.gettempdir() |
| config.set('agent', 'prefix', tempdir) |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(config, dummy_controller) |
| isfile_mock.return_value = True |
| # Test dumping EXECUTION_COMMAND |
| json_file = orchestrator.dump_command_to_json(command) |
| self.assertTrue(os.path.exists(json_file)) |
| self.assertTrue(os.path.getsize(json_file) > 0) |
| if get_platform() != PLATFORM_WINDOWS: |
| self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') |
| self.assertTrue(json_file.endswith("command-3.json")) |
| self.assertTrue(decompress_cluster_host_info_mock.called) |
| os.unlink(json_file) |
| # Test dumping STATUS_COMMAND |
| command['commandType']='STATUS_COMMAND' |
| decompress_cluster_host_info_mock.reset_mock() |
| json_file = orchestrator.dump_command_to_json(command) |
| self.assertTrue(os.path.exists(json_file)) |
| self.assertTrue(os.path.getsize(json_file) > 0) |
| if get_platform() != PLATFORM_WINDOWS: |
| self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') |
| self.assertTrue(json_file.endswith("status_command.json")) |
| self.assertFalse(decompress_cluster_host_info_mock.called) |
| os.unlink(json_file) |
| # Testing side effect of dump_command_to_json |
| self.assertEquals(command['public_hostname'], "test.hst") |
| self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0) |
| self.assertTrue(unlink_mock.called) |
| |
| |
| @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) |
| @patch("ambari_agent.hostname.public_hostname") |
| @patch("os.path.isfile") |
| @patch("os.unlink") |
| @patch.object(FileCache, "__init__") |
| def test_dump_command_to_json_with_retry(self, FileCache_mock, unlink_mock, |
| isfile_mock, hostname_mock): |
| FileCache_mock.return_value = None |
| hostname_mock.return_value = "test.hst" |
| command = { |
| 'commandType': 'EXECUTION_COMMAND', |
| 'role': u'DATANODE', |
| 'roleCommand': u'INSTALL', |
| 'commandId': '1-1', |
| 'taskId': 3, |
| 'clusterName': u'cc', |
| 'serviceName': u'HDFS', |
| 'configurations':{'global' : {}}, |
| 'configurationTags':{'global' : { 'tag': 'v1' }}, |
| 'clusterHostInfo':{'namenode_host' : ['1'], |
| 'slave_hosts' : ['0', '1'], |
| 'all_racks' : [u'/default-rack:0'], |
| 'ambari_server_host' : 'a.b.c', |
| 'ambari_server_port' : '123', |
| 'ambari_server_use_ssl' : 'false', |
| 'all_ipv4_ips' : [u'192.168.12.101:0'], |
| 'all_hosts' : ['h1.hortonworks.com', 'h2.hortonworks.com'], |
| 'all_ping_ports': ['8670:0,1']}, |
| 'hostLevelParams':{} |
| } |
| |
| config = AmbariConfig() |
| tempdir = tempfile.gettempdir() |
| config.set('agent', 'prefix', tempdir) |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(config, dummy_controller) |
| isfile_mock.return_value = True |
| # Test dumping EXECUTION_COMMAND |
| json_file = orchestrator.dump_command_to_json(command) |
| self.assertTrue(os.path.exists(json_file)) |
| self.assertTrue(os.path.getsize(json_file) > 0) |
| if get_platform() != PLATFORM_WINDOWS: |
| self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') |
| self.assertTrue(json_file.endswith("command-3.json")) |
| os.unlink(json_file) |
| # Test dumping STATUS_COMMAND |
| json_file = orchestrator.dump_command_to_json(command, True) |
| self.assertTrue(os.path.exists(json_file)) |
| self.assertTrue(os.path.getsize(json_file) > 0) |
| if get_platform() != PLATFORM_WINDOWS: |
| self.assertEqual(oct(os.stat(json_file).st_mode & 0777), '0600') |
| self.assertTrue(json_file.endswith("command-3.json")) |
| os.unlink(json_file) |
| # Testing side effect of dump_command_to_json |
| self.assertEquals(command['public_hostname'], "test.hst") |
| self.assertEquals(command['agentConfigParams']['agent']['parallel_execution'], 0) |
| self.assertTrue(unlink_mock.called) |
| |
| @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) |
| @patch("os.path.exists") |
| @patch.object(FileCache, "__init__") |
| def test_resolve_script_path(self, FileCache_mock, exists_mock): |
| FileCache_mock.return_value = None |
| dummy_controller = MagicMock() |
| config = AmbariConfig() |
| orchestrator = CustomServiceOrchestrator(config, dummy_controller) |
| # Testing existing path |
| exists_mock.return_value = True |
| path = orchestrator.\ |
| resolve_script_path(os.path.join("HBASE", "package"), os.path.join("scripts", "hbase_master.py")) |
| self.assertEqual(os.path.join("HBASE", "package", "scripts", "hbase_master.py"), path) |
| # Testing not existing path |
| exists_mock.return_value = False |
| try: |
| orchestrator.resolve_script_path("/HBASE", |
| os.path.join("scripts", "hbase_master.py")) |
| self.fail('ExpectedException not thrown') |
| except AgentException: |
| pass # Expected |
| |
| @patch.object(FileCache, "get_custom_resources_subdir") |
| @patch.object(CustomServiceOrchestrator, "resolve_script_path") |
| @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") |
| @patch.object(FileCache, "get_host_scripts_base_dir") |
| @patch.object(FileCache, "get_service_base_dir") |
| @patch.object(FileCache, "get_hook_base_dir") |
| @patch.object(CustomServiceOrchestrator, "dump_command_to_json") |
| @patch.object(PythonExecutor, "run_file") |
| @patch.object(FileCache, "__init__") |
| def test_runCommand(self, FileCache_mock, |
| run_file_mock, dump_command_to_json_mock, |
| get_hook_base_dir_mock, get_service_base_dir_mock, |
| get_host_scripts_base_dir_mock, |
| resolve_hook_script_path_mock, |
| resolve_script_path_mock, |
| get_custom_resources_subdir_mock): |
| |
| FileCache_mock.return_value = None |
| command = { |
| 'commandType' : 'EXECUTION_COMMAND', |
| 'role' : 'REGION_SERVER', |
| 'hostLevelParams' : { |
| 'stack_name' : 'HDP', |
| 'stack_version' : '2.0.7', |
| 'jdk_location' : 'some_location' |
| }, |
| 'commandParams': { |
| 'script_type': 'PYTHON', |
| 'script': 'scripts/hbase_regionserver.py', |
| 'command_timeout': '600', |
| 'service_package_folder' : 'HBASE' |
| }, |
| 'taskId' : '3', |
| 'roleCommand': 'INSTALL' |
| } |
| |
| get_host_scripts_base_dir_mock.return_value = "/host_scripts" |
| get_service_base_dir_mock.return_value = "/basedir/" |
| resolve_script_path_mock.return_value = "/basedir/scriptpath" |
| resolve_hook_script_path_mock.return_value = \ |
| ('/hooks_dir/prefix-command/scripts/hook.py', |
| '/hooks_dir/prefix-command') |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) |
| unix_process_id = 111 |
| orchestrator.commands_in_progress = {command['taskId']: unix_process_id} |
| get_hook_base_dir_mock.return_value = "/hooks/" |
| # normal run case |
| run_file_mock.return_value = { |
| 'stdout' : 'sss', |
| 'stderr' : 'eee', |
| 'exitcode': 0, |
| } |
| ret = orchestrator.runCommand(command, "out.txt", "err.txt") |
| self.assertEqual(ret['exitcode'], 0) |
| self.assertTrue(run_file_mock.called) |
| self.assertEqual(run_file_mock.call_count, 3) |
| |
| # running a status command |
| run_file_mock.reset_mock() |
| def return_traceback(*args, **kwargs): |
| return { |
| 'stderr': traceback.format_exc(), |
| 'stdout': '', |
| 'exitcode': 0, |
| } |
| run_file_mock.side_effect = return_traceback |
| |
| status_command = dict(command) |
| status_command['commandType'] = 'STATUS_COMMAND' |
| del status_command['taskId'] |
| del status_command['roleCommand'] |
| ret = orchestrator.runCommand(status_command, "out.txt", "err.txt") |
| self.assertEqual('None\n', ret['stderr']) |
| |
| run_file_mock.reset_mock() |
| |
| # Case when we force another command |
| run_file_mock.return_value = { |
| 'stdout' : 'sss', |
| 'stderr' : 'eee', |
| 'exitcode': 0, |
| } |
| ret = orchestrator.runCommand(command, "out.txt", "err.txt", |
| forced_command_name=CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON) |
| ## Check that override_output_files was true only during first call |
| print run_file_mock |
| self.assertEquals(run_file_mock.call_args_list[0][0][8], True) |
| self.assertEquals(run_file_mock.call_args_list[1][0][8], False) |
| self.assertEquals(run_file_mock.call_args_list[2][0][8], False) |
| ## Check that forced_command_name was taken into account |
| self.assertEqual(run_file_mock.call_args_list[0][0][1][0], |
| CustomServiceOrchestrator.SCRIPT_TYPE_PYTHON) |
| |
| run_file_mock.reset_mock() |
| |
| # unknown script type case |
| command['commandParams']['script_type'] = "SOME_TYPE" |
| ret = orchestrator.runCommand(command, "out.txt", "err.txt") |
| self.assertEqual(ret['exitcode'], 1) |
| self.assertFalse(run_file_mock.called) |
| self.assertTrue("Unknown script type" in ret['stdout']) |
| |
| #By default returns empty dictionary |
| self.assertEqual(ret['structuredOut'], '{}') |
| |
| pass |
| |
| @patch("ambari_commons.shell.kill_process_with_children") |
| @patch.object(CustomServiceOrchestrator, "resolve_script_path") |
| @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") |
| @patch.object(FileCache, "get_host_scripts_base_dir") |
| @patch.object(FileCache, "get_service_base_dir") |
| @patch.object(FileCache, "get_hook_base_dir") |
| @patch.object(CustomServiceOrchestrator, "dump_command_to_json") |
| @patch.object(PythonExecutor, "run_file") |
| @patch.object(FileCache, "__init__") |
| def test_cancel_command(self, FileCache_mock, |
| run_file_mock, dump_command_to_json_mock, |
| get_hook_base_dir_mock, get_service_base_dir_mock, |
| get_host_scripts_base_dir_mock, |
| resolve_hook_script_path_mock, resolve_script_path_mock, |
| kill_process_with_children_mock): |
| FileCache_mock.return_value = None |
| command = { |
| 'role' : 'REGION_SERVER', |
| 'hostLevelParams' : { |
| 'stack_name' : 'HDP', |
| 'stack_version' : '2.0.7', |
| 'jdk_location' : 'some_location' |
| }, |
| 'commandParams': { |
| 'script_type': 'PYTHON', |
| 'script': 'scripts/hbase_regionserver.py', |
| 'command_timeout': '600', |
| 'service_package_folder' : 'HBASE' |
| }, |
| 'taskId' : '3', |
| 'roleCommand': 'INSTALL' |
| } |
| |
| get_host_scripts_base_dir_mock.return_value = "/host_scripts" |
| get_service_base_dir_mock.return_value = "/basedir/" |
| resolve_script_path_mock.return_value = "/basedir/scriptpath" |
| resolve_hook_script_path_mock.return_value = \ |
| ('/hooks_dir/prefix-command/scripts/hook.py', |
| '/hooks_dir/prefix-command') |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) |
| unix_process_id = 111 |
| orchestrator.commands_in_progress = {command['taskId']: unix_process_id} |
| get_hook_base_dir_mock.return_value = "/hooks/" |
| run_file_mock_return_value = { |
| 'stdout' : 'killed', |
| 'stderr' : 'killed', |
| 'exitcode': 1, |
| } |
| def side_effect(*args, **kwargs): |
| time.sleep(0.2) |
| return run_file_mock_return_value |
| run_file_mock.side_effect = side_effect |
| |
| _, out = tempfile.mkstemp() |
| _, err = tempfile.mkstemp() |
| pool = ThreadPool(processes=1) |
| async_result = pool.apply_async(orchestrator.runCommand, (command, out, err)) |
| |
| time.sleep(0.1) |
| orchestrator.cancel_command(command['taskId'], 'reason') |
| |
| ret = async_result.get() |
| |
| self.assertEqual(ret['exitcode'], 1) |
| self.assertEquals(ret['stdout'], 'killed\nCommand aborted. Reason: \'reason\'') |
| self.assertEquals(ret['stderr'], 'killed\nCommand aborted. Reason: \'reason\'') |
| |
| self.assertTrue(kill_process_with_children_mock.called) |
| self.assertFalse(command['taskId'] in orchestrator.commands_in_progress.keys()) |
| self.assertTrue(os.path.exists(out)) |
| self.assertTrue(os.path.exists(err)) |
| try: |
| os.remove(out) |
| os.remove(err) |
| except: |
| pass |
| |
| @patch.object(OSCheck, "os_distribution", new = MagicMock(return_value = os_distro_value)) |
| @patch.object(CustomServiceOrchestrator, "get_py_executor") |
| @patch("ambari_commons.shell.kill_process_with_children") |
| @patch.object(FileCache, "__init__") |
| @patch.object(CustomServiceOrchestrator, "resolve_script_path") |
| @patch.object(CustomServiceOrchestrator, "resolve_hook_script_path") |
| def test_cancel_backgound_command(self, resolve_hook_script_path_mock, |
| resolve_script_path_mock, FileCache_mock, kill_process_with_children_mock, |
| get_py_executor_mock): |
| FileCache_mock.return_value = None |
| FileCache_mock.cache_dir = MagicMock() |
| resolve_hook_script_path_mock.return_value = None |
| dummy_controller = MagicMock() |
| cfg = AmbariConfig() |
| cfg.set('agent', 'tolerate_download_failures', 'true') |
| cfg.set('agent', 'prefix', '.') |
| cfg.set('agent', 'cache_dir', 'background_tasks') |
| |
| actionQueue = ActionQueue(cfg, dummy_controller) |
| |
| dummy_controller.actionQueue = actionQueue |
| orchestrator = CustomServiceOrchestrator(cfg, dummy_controller) |
| orchestrator.file_cache = MagicMock() |
| def f (a, b): |
| return "" |
| orchestrator.file_cache.get_service_base_dir = f |
| actionQueue.customServiceOrchestrator = orchestrator |
| |
| import TestActionQueue |
| import copy |
| |
| pyex = PythonExecutor(actionQueue.customServiceOrchestrator.tmp_dir, actionQueue.customServiceOrchestrator.config) |
| TestActionQueue.patch_output_file(pyex) |
| pyex.prepare_process_result = MagicMock() |
| get_py_executor_mock.return_value = pyex |
| orchestrator.dump_command_to_json = MagicMock() |
| |
| lock = threading.RLock() |
| complete_done = threading.Condition(lock) |
| |
| complete_was_called = {} |
| def command_complete_w(process_condenced_result, handle): |
| with lock: |
| complete_was_called['visited']= '' |
| complete_done.wait(3) |
| |
| actionQueue.on_background_command_complete_callback = TestActionQueue.wraped(actionQueue.on_background_command_complete_callback, command_complete_w, None) |
| execute_command = copy.deepcopy(TestActionQueue.TestActionQueue.background_command) |
| actionQueue.put([execute_command]) |
| actionQueue.processBackgroundQueueSafeEmpty() |
| |
| time.sleep(.1) |
| |
| orchestrator.cancel_command(19,'reason') |
| self.assertTrue(kill_process_with_children_mock.called) |
| kill_process_with_children_mock.assert_called_with(33) |
| |
| with lock: |
| complete_done.notifyAll() |
| |
| with lock: |
| self.assertTrue(complete_was_called.has_key('visited')) |
| |
| time.sleep(.1) |
| |
| runningCommand = actionQueue.commandStatuses.get_command_status(19) |
| self.assertTrue(runningCommand is not None) |
| self.assertEqual(runningCommand['status'], ActionQueue.FAILED_STATUS) |
| |
| |
| @patch.object(AmbariConfig, "get") |
| @patch.object(CustomServiceOrchestrator, "dump_command_to_json") |
| @patch.object(PythonExecutor, "run_file") |
| @patch.object(FileCache, "__init__") |
| @patch.object(FileCache, "get_custom_actions_base_dir") |
| def test_runCommand_custom_action(self, get_custom_actions_base_dir_mock, |
| FileCache_mock, |
| run_file_mock, dump_command_to_json_mock, ambari_config_get): |
| ambari_config_get.return_value = "0" |
| FileCache_mock.return_value = None |
| get_custom_actions_base_dir_mock.return_value = "some path" |
| _, script = tempfile.mkstemp() |
| command = { |
| 'role' : 'any', |
| 'commandParams': { |
| 'script_type': 'PYTHON', |
| 'script': 'some_custom_action.py', |
| 'command_timeout': '600', |
| 'jdk_location' : 'some_location' |
| }, |
| 'taskId' : '3', |
| 'roleCommand': 'ACTIONEXECUTE' |
| } |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) |
| unix_process_id = 111 |
| orchestrator.commands_in_progress = {command['taskId']: unix_process_id} |
| # normal run case |
| run_file_mock.return_value = { |
| 'stdout' : 'sss', |
| 'stderr' : 'eee', |
| 'exitcode': 0, |
| } |
| ret = orchestrator.runCommand(command, "out.txt", "err.txt") |
| self.assertEqual(ret['exitcode'], 0) |
| self.assertTrue(run_file_mock.called) |
| # Hoooks are not supported for custom actions, |
| # that's why run_file() should be called only once |
| self.assertEqual(run_file_mock.call_count, 1) |
| |
| |
| @patch("os.path.isfile") |
| @patch.object(FileCache, "__init__") |
| def test_resolve_hook_script_path(self, FileCache_mock, isfile_mock): |
| FileCache_mock.return_value = None |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) |
| # Testing None param |
| res1 = orchestrator.resolve_hook_script_path(None, "prefix", "command", |
| "script_type") |
| self.assertEqual(res1, None) |
| # Testing existing hook script |
| isfile_mock.return_value = True |
| res2 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command", |
| "script_type") |
| self.assertEqual(res2, (os.path.join('hooks_dir', 'prefix-command', 'scripts', 'hook.py'), |
| os.path.join('hooks_dir', 'prefix-command'))) |
| # Testing not existing hook script |
| isfile_mock.return_value = False |
| res3 = orchestrator.resolve_hook_script_path("hooks_dir", "prefix", "command", |
| "script_type") |
| self.assertEqual(res3, None) |
| |
| |
| @patch.object(CustomServiceOrchestrator, "runCommand") |
| @patch.object(FileCache, "__init__") |
| def test_requestComponentStatus(self, FileCache_mock, runCommand_mock): |
| FileCache_mock.return_value = None |
| status_command = { |
| "serviceName" : 'HDFS', |
| "commandType" : "STATUS_COMMAND", |
| "clusterName" : "", |
| "componentName" : "DATANODE", |
| 'configurations':{} |
| } |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) |
| # Test alive case |
| runCommand_mock.return_value = { |
| "exitcode" : 0 |
| } |
| |
| status = orchestrator.requestComponentStatus(status_command) |
| self.assertEqual(runCommand_mock.return_value, status) |
| |
| # Test dead case |
| runCommand_mock.return_value = { |
| "exitcode" : 1 |
| } |
| status = orchestrator.requestComponentStatus(status_command) |
| self.assertEqual(runCommand_mock.return_value, status) |
| |
| @patch.object(CustomServiceOrchestrator, "runCommand") |
| @patch.object(FileCache, "__init__") |
| def test_requestComponentSecurityState(self, FileCache_mock, runCommand_mock): |
| FileCache_mock.return_value = None |
| status_command = { |
| "serviceName" : 'HDFS', |
| "commandType" : "STATUS_COMMAND", |
| "clusterName" : "", |
| "componentName" : "DATANODE", |
| 'configurations':{} |
| } |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) |
| # Test securityState |
| runCommand_mock.return_value = { |
| 'exitcode' : 0, |
| 'structuredOut' : {'securityState': 'UNSECURED'} |
| } |
| |
| status = orchestrator.requestComponentSecurityState(status_command) |
| self.assertEqual('UNSECURED', status) |
| |
| # Test case where exit code indicates failure |
| runCommand_mock.return_value = { |
| "exitcode" : 1 |
| } |
| status = orchestrator.requestComponentSecurityState(status_command) |
| self.assertEqual('UNKNOWN', status) |
| |
| @patch.object(FileCache, "__init__") |
| def test_requestComponentSecurityState_realFailure(self, FileCache_mock): |
| ''' |
| Tests the case where the CustomServiceOrchestrator attempts to call a service's security_status |
| method, but fails to do so because the script or method was not found. |
| :param FileCache_mock: |
| :return: |
| ''' |
| FileCache_mock.return_value = None |
| status_command = { |
| "serviceName" : 'BOGUS_SERVICE', |
| "commandType" : "STATUS_COMMAND", |
| "clusterName" : "", |
| "componentName" : "DATANODE", |
| 'configurations':{} |
| } |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) |
| |
| status = orchestrator.requestComponentSecurityState(status_command) |
| self.assertEqual('UNKNOWN', status) |
| |
| |
| @patch.object(CustomServiceOrchestrator, "get_py_executor") |
| @patch.object(CustomServiceOrchestrator, "dump_command_to_json") |
| @patch.object(FileCache, "__init__") |
| @patch.object(FileCache, "get_custom_actions_base_dir") |
| def test_runCommand_background_action(self, get_custom_actions_base_dir_mock, |
| FileCache_mock, |
| dump_command_to_json_mock, |
| get_py_executor_mock): |
| FileCache_mock.return_value = None |
| get_custom_actions_base_dir_mock.return_value = "some path" |
| _, script = tempfile.mkstemp() |
| command = { |
| 'role' : 'any', |
| 'commandParams': { |
| 'script_type': 'PYTHON', |
| 'script': 'some_custom_action.py', |
| 'command_timeout': '600', |
| 'jdk_location' : 'some_location' |
| }, |
| 'taskId' : '13', |
| 'roleCommand': 'ACTIONEXECUTE', |
| 'commandType': 'BACKGROUND_EXECUTION_COMMAND', |
| '__handle': BackgroundCommandExecutionHandle({'taskId': '13'}, 13, |
| MagicMock(), MagicMock()) |
| } |
| dummy_controller = MagicMock() |
| orchestrator = CustomServiceOrchestrator(self.config, dummy_controller) |
| |
| import TestActionQueue |
| pyex = PythonExecutor(orchestrator.tmp_dir, orchestrator.config) |
| TestActionQueue.patch_output_file(pyex) |
| pyex.condenseOutput = MagicMock() |
| get_py_executor_mock.return_value = pyex |
| orchestrator.dump_command_to_json = MagicMock() |
| |
| ret = orchestrator.runCommand(command, "out.txt", "err.txt") |
| self.assertEqual(ret['exitcode'], 777) |
| |
| def tearDown(self): |
| # enable stdout |
| sys.stdout = sys.__stdout__ |
| |
| |