blob: 2e1e4cfc2f92ba2203e1a6f5786f75c7cabbca1f [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
from Queue import Queue
from unittest import TestCase
import unittest
from ActionQueue import ActionQueue
from AgentConfig import AgentConfig
import os
import errno
import time
import pprint
import tempfile
import threading
import StringIO
import sys
import logging
from threading import Thread
from mock.mock import patch, MagicMock, call
from CustomServiceOrchestrator import CustomServiceOrchestrator
from PythonExecutor import PythonExecutor
from CommandStatusDict import CommandStatusDict
class TestActionQueue(TestCase):
def setUp(self):
out = StringIO.StringIO()
sys.stdout = out
# save original open() method for later use
self.original_open = open
def tearDown(self):
sys.stdout = sys.__stdout__
datanode_install_command = {
'commandType': 'EXECUTION_COMMAND',
'role': u'HBASE_MASTER',
"componentName": "HBASE_MASTER",
'roleCommand': u'INSTALL',
'commandId': '1-1',
'taskId': 3,
'clusterName': u'cc',
'serviceName': u'HBASE',
'configurations': {'global': {}},
'configurationTags': {'global': {'tag': 'v1'}},
'commandParams': {'script_type': 'PYTHON',
'script': 'scripts/abc.py',
'command_timeout': '600'}
}
hbase_install_command = {
'commandType': 'EXECUTION_COMMAND',
'role': u'HBASE',
'roleCommand': u'INSTALL',
'commandId': '1-1',
'taskId': 7,
"componentName": "HBASE_MASTER",
'clusterName': u'cc',
'serviceName': u'HDFS',
}
status_command = {
"serviceName": 'ACCUMULO',
"commandType": "STATUS_COMMAND",
"clusterName": "c1",
"componentName": "ACCUMULO_MASTER",
'configurations': {},
'roleCommand': "STATUS"
}
status_command_with_config = {
"serviceName": 'ACCUMULO',
"commandType": "STATUS_COMMAND",
"clusterName": "c1",
"componentName": "ACCUMULO_MASTER",
'configurations': {},
"commandParams": {"retrieve_config": "false"},
'roleCommand': "GET_CONFIG"
}
@patch.object(ActionQueue, "process_command")
@patch.object(Queue, "get")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_ActionQueueStartStop(self, CustomServiceOrchestrator_mock,
get_mock, process_command_mock):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
config = MagicMock()
actionQueue = ActionQueue(config, dummy_controller)
actionQueue.start()
time.sleep(3)
actionQueue.stop()
actionQueue.join()
self.assertEqual(actionQueue.stopped(), True, 'Action queue is not stopped.')
self.assertTrue(process_command_mock.call_count > 1)
@patch("traceback.print_exc")
@patch.object(ActionQueue, "execute_command")
def test_process_command(self,
execute_command_mock, print_exc_mock):
dummy_controller = MagicMock()
actionQueue = ActionQueue(AgentConfig("", ""), dummy_controller)
execution_command = {
'commandType': ActionQueue.EXECUTION_COMMAND,
}
wrong_command = {
'commandType': "SOME_WRONG_COMMAND",
}
# Try wrong command
actionQueue.process_command(wrong_command)
self.assertFalse(execute_command_mock.called)
self.assertFalse(print_exc_mock.called)
execute_command_mock.reset_mock()
print_exc_mock.reset_mock()
# Try normal execution
actionQueue.process_command(execution_command)
self.assertTrue(execute_command_mock.called)
self.assertFalse(print_exc_mock.called)
execute_command_mock.reset_mock()
print_exc_mock.reset_mock()
execute_command_mock.reset_mock()
print_exc_mock.reset_mock()
# Try exception to check proper logging
def side_effect(self):
raise Exception("TerribleException")
execute_command_mock.side_effect = side_effect
actionQueue.process_command(execution_command)
self.assertTrue(print_exc_mock.called)
print_exc_mock.reset_mock()
actionQueue.process_command(execution_command)
self.assertTrue(print_exc_mock.called)
@patch.object(ActionQueue, "status_update_callback")
@patch.object(CustomServiceOrchestrator, "requestComponentStatus")
@patch.object(ActionQueue, "execute_command")
@patch.object(CustomServiceOrchestrator, "__init__")
def test_execute_status_command(self, CustomServiceOrchestrator_mock,
execute_command_mock,
requestComponentStatus_mock,
status_update_callback):
CustomServiceOrchestrator_mock.return_value = None
dummy_controller = MagicMock()
actionQueue = ActionQueue(AgentConfig("", ""), dummy_controller)
requestComponentStatus_mock.return_value = {'exitcode': 'dummy report'}
actionQueue.execute_status_command(self.status_command)
report = actionQueue.result()
expected = 'dummy report'
self.assertEqual(len(report['componentStatus']), 1)
self.assertEqual(report['componentStatus'][0]["status"], expected)
self.assertEqual(report['componentStatus'][0]["componentName"], "ACCUMULO_MASTER")
self.assertEqual(report['componentStatus'][0]["serviceName"], "ACCUMULO")
self.assertEqual(report['componentStatus'][0]["clusterName"], "c1")
self.assertTrue(requestComponentStatus_mock.called)
@patch.object(ActionQueue, "status_update_callback")
@patch.object(CustomServiceOrchestrator, "runCommand")
@patch.object(ActionQueue, "execute_command")
@patch('CustomServiceOrchestrator.CustomServiceOrchestrator', autospec=True)
def test_execute_status_command_expect_config(self, CustomServiceOrchestrator_mock,
execute_command_mock,
runCommand_mock,
status_update_callback):
csoMocks = [MagicMock()]
CustomServiceOrchestrator_mock.side_effect = csoMocks
csoMocks[0].status_commands_stdout = None
csoMocks[0].status_commands_stderr = None
dummy_controller = MagicMock()
actionQueue = ActionQueue(AgentConfig("", ""), dummy_controller)
runCommand_mock.return_value = {'configurations': {}}
actionQueue.execute_status_command(self.status_command_with_config)
report = actionQueue.result()
self.assertEqual(len(report['componentStatus']), 1)
self.assertEqual(report['componentStatus'][0]["componentName"], "ACCUMULO_MASTER")
self.assertEqual(report['componentStatus'][0]["serviceName"], "ACCUMULO")
self.assertEqual(report['componentStatus'][0]["clusterName"], "c1")
self.assertEqual(report['componentStatus'][0]["configurations"], {})
self.assertFalse(runCommand_mock.called)
@patch("traceback.print_exc")
@patch.object(ActionQueue, "execute_command")
@patch.object(ActionQueue, "execute_status_command")
def test_process_command(self, execute_status_command_mock,
execute_command_mock, print_exc_mock):
dummy_controller = MagicMock()
actionQueue = ActionQueue(AgentConfig("", ""), dummy_controller)
execution_command = {
'commandType': ActionQueue.EXECUTION_COMMAND,
}
status_command = {
'commandType': ActionQueue.STATUS_COMMAND,
}
wrong_command = {
'commandType': "SOME_WRONG_COMMAND",
}
# Try wrong command
actionQueue.process_command(wrong_command)
self.assertFalse(execute_command_mock.called)
self.assertFalse(execute_status_command_mock.called)
self.assertFalse(print_exc_mock.called)
execute_command_mock.reset_mock()
execute_status_command_mock.reset_mock()
print_exc_mock.reset_mock()
# Try normal execution
actionQueue.process_command(execution_command)
self.assertTrue(execute_command_mock.called)
self.assertFalse(execute_status_command_mock.called)
self.assertFalse(print_exc_mock.called)
execute_command_mock.reset_mock()
execute_status_command_mock.reset_mock()
print_exc_mock.reset_mock()
actionQueue.process_command(status_command)
self.assertFalse(execute_command_mock.called)
self.assertTrue(execute_status_command_mock.called)
self.assertFalse(print_exc_mock.called)
execute_command_mock.reset_mock()
execute_status_command_mock.reset_mock()
print_exc_mock.reset_mock()
# Try exception to check proper logging
def side_effect(self):
raise Exception("TerribleException")
execute_command_mock.side_effect = side_effect
actionQueue.process_command(execution_command)
self.assertTrue(print_exc_mock.called)
print_exc_mock.reset_mock()
execute_status_command_mock.side_effect = side_effect
actionQueue.process_command(execution_command)
self.assertTrue(print_exc_mock.called)
@patch.object(CustomServiceOrchestrator, "resolve_script_path")
@patch("json.load")
@patch("__builtin__.open")
@patch.object(ActionQueue, "status_update_callback")
def test_execute_command(self, status_update_callback_mock, open_mock, json_load_mock,
resolve_script_path_mock):
tempdir = tempfile.gettempdir()
config = MagicMock()
config.get.return_value = "something"
config.getResolvedPath.return_value = tempdir
config.getWorkRootPath.return_value = tempdir
config.getLogPath.return_value = tempdir
# Make file read calls visible
def open_side_effect(file, mode):
if mode == 'r':
file_mock = MagicMock()
file_mock.read.return_value = "Read from " + str(file)
return file_mock
else:
return self.original_open(file, mode)
open_mock.side_effect = open_side_effect
json_load_mock.return_value = ''
resolve_script_path_mock.return_value = "abc.py"
dummy_controller = MagicMock()
actionQueue = ActionQueue(config, dummy_controller)
unfreeze_flag = threading.Event()
python_execution_result_dict = {
'stdout': 'out',
'stderr': 'stderr',
'structuredOut': ''
}
def side_effect(py_file, script_params,
tmpoutfile, tmperrfile, timeout,
tmpstrucoutfile,
override_output_files,
environment_vars):
unfreeze_flag.wait()
return python_execution_result_dict
def patched_aq_execute_command(command):
# We have to perform patching for separate thread in the same thread
with patch.object(PythonExecutor, "run_file") as runCommand_mock:
runCommand_mock.side_effect = side_effect
actionQueue.execute_command(command)
### Test install/start/stop command ###
## Test successful execution with configuration tags
python_execution_result_dict['status'] = 'COMPLETE'
python_execution_result_dict['exitcode'] = 0
# We call method in a separate thread
execution_thread = Thread(target=patched_aq_execute_command,
args=(self.datanode_install_command, ))
execution_thread.start()
# check in progress report
# wait until ready
while True:
time.sleep(0.1)
report = actionQueue.result()
if len(report['reports']) != 0:
break
expected = {'status': 'IN_PROGRESS',
'stderr': 'Read from {0}/errors-3.txt'.format(tempdir),
'stdout': 'Read from {0}/output-3.txt'.format(tempdir),
'structuredOut': '',
'clusterName': u'cc',
'roleCommand': u'INSTALL',
'serviceName': u'HBASE',
'role': u'HBASE_MASTER',
'actionId': '1-1',
'taskId': 3,
'exitcode': 777}
self.assertEqual(report['reports'][0], expected)
# Continue command execution
unfreeze_flag.set()
# wait until ready
while report['reports'][0]['status'] == 'IN_PROGRESS':
time.sleep(0.1)
report = actionQueue.result()
# check report
configname = os.path.join(tempdir, 'command-3.json')
expected = {'status': 'COMPLETED',
'stderr': 'stderr',
'stdout': 'out',
'clusterName': u'cc',
'configurationTags': {'global': {'tag': 'v1'}},
'roleCommand': u'INSTALL',
'serviceName': u'HBASE',
'role': u'HBASE_MASTER',
'actionId': '1-1',
'taskId': 3,
'structuredOut': '',
'exitcode': 0,
'allocatedPorts': {},
'folders': {'AGENT_LOG_ROOT': tempdir, 'AGENT_WORK_ROOT': tempdir}}
self.assertEqual(len(report['reports']), 1)
self.assertEqual(report['reports'][0], expected)
self.assertTrue(os.path.isfile(configname))
# Check that we had 2 status update calls ( IN_PROGRESS and COMPLETE)
self.assertEqual(status_update_callback_mock.call_count, 2)
os.remove(configname)
# now should not have reports (read complete/failed reports are deleted)
report = actionQueue.result()
self.assertEqual(len(report['reports']), 0)
## Test failed execution
python_execution_result_dict['status'] = 'FAILED'
python_execution_result_dict['exitcode'] = 13
# We call method in a separate thread
execution_thread = Thread(target=patched_aq_execute_command,
args=(self.datanode_install_command, ))
execution_thread.start()
unfreeze_flag.set()
# check in progress report
# wait until ready
report = actionQueue.result()
while len(report['reports']) == 0 or \
report['reports'][0]['status'] == 'IN_PROGRESS':
time.sleep(0.1)
report = actionQueue.result()
# check report
expected = {'status': 'FAILED',
'stderr': 'stderr',
'stdout': 'out',
'clusterName': u'cc',
'roleCommand': u'INSTALL',
'serviceName': u'HBASE',
'role': u'HBASE_MASTER',
'actionId': '1-1',
'taskId': 3,
'structuredOut': '',
'exitcode': 13}
self.assertEqual(len(report['reports']), 1)
self.assertEqual(report['reports'][0], expected)
# now should not have reports (read complete/failed reports are deleted)
report = actionQueue.result()
self.assertEqual(len(report['reports']), 0)
if __name__ == "__main__":
logging.basicConfig(format='%(asctime)s %(message)s', level=logging.DEBUG)
unittest.main()