blob: 8689a473fae5dacd599e8498e72ad841addc39d1 [file] [log] [blame]
#!/usr/bin/env python
"""
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import logging
import json
import subprocess
import time
import traceback
from resource_management.core import global_lock
from resource_management.core import shell
from resource_management.core.exceptions import Fail
from resource_management.core.resources import Execute
from resource_management.libraries.functions import format
from resource_management.libraries.functions import get_kinit_path
from resource_management.libraries.script.script import Script
OK_MESSAGE = "The application reported a '{0}' state in {1:.3f}s"
MESSAGE_WITH_STATE_AND_INSTANCES = "The application reported a '{0}' state in {1:.3f}s. [Live: {2}, Desired: {3}]"
CRITICAL_MESSAGE_WITH_STATE = "The application reported a '{0}' state. Check took {1:.3f}s"
CRITICAL_MESSAGE = "Application information could not be retrieved"
# results codes
CRITICAL_RESULT_CODE = 'CRITICAL'
OK_RESULT_CODE = 'OK'
UKNOWN_STATUS_CODE = 'UNKNOWN'
SECURITY_ENABLED_KEY = '{{cluster-env/security_enabled}}'
HIVE_PRINCIPAL_KEY = '{{hive-interactive-site/hive.llap.daemon.service.principal}}'
HIVE_PRINCIPAL_DEFAULT = 'default.hive.principal'
HIVE_PRINCIPAL_KEYTAB_KEY = '{{hive-interactive-site/hive.llap.daemon.keytab.file}}'
HIVE_PRINCIPAL_KEYTAB_DEFAULT = 'default.hive.keytab'
HIVE_AUTHENTICATION_DEFAULT = 'NOSASL'
HIVE_USER_KEY = '{{hive-env/hive_user}}'
HIVE_USER_DEFAULT = 'default.smoke.user'
STACK_ROOT = '{{cluster-env/stack_root}}'
STACK_ROOT_DEFAULT = Script.get_stack_root()
LLAP_APP_NAME_KEY = '{{hive-interactive-env/llap_app_name}}'
LLAP_APP_NAME_DEFAULT = 'llap0'
# The configured Kerberos executable search paths, if any
KERBEROS_EXECUTABLE_SEARCH_PATHS_KEY = '{{kerberos-env/executable_search_paths}}'
CHECK_COMMAND_TIMEOUT_KEY = 'check.command.timeout'
CHECK_COMMAND_TIMEOUT_DEFAULT = 120.0
# Mapping of LLAP app states to 'user friendly' state names.
llap_app_state_dict = {'RUNNING_ALL': 'RUNNING',
'RUNNING_PARTIAL': 'RUNNING',
'COMPLETE': 'NOT RUNNING',
'LAUNCHING': 'LAUNCHING',
'APP_NOT_FOUND': 'APP NOT FOUND'}
logger = logging.getLogger('ambari_alerts')
def get_tokens():
"""
Returns a tuple of tokens in the format {{site/property}} that will be used
to build the dictionary passed into execute
"""
return (SECURITY_ENABLED_KEY, KERBEROS_EXECUTABLE_SEARCH_PATHS_KEY, HIVE_PRINCIPAL_KEY, HIVE_PRINCIPAL_KEYTAB_KEY,
HIVE_USER_KEY, STACK_ROOT, LLAP_APP_NAME_KEY)
def execute(configurations={}, parameters={}, host_name=None):
"""
Returns a tuple containing the result code and a pre-formatted result label
Keyword arguments:
configurations (dictionary): a mapping of configuration key to value
parameters (dictionary): a mapping of script parameter key to value
host_name (string): the name of this host where the alert is running
"""
LLAP_APP_STATUS_CMD_TIMEOUT = 0
if configurations is None:
return ('UNKNOWN', ['There were no configurations supplied to the script.'])
result_code = None
try:
security_enabled = False
if SECURITY_ENABLED_KEY in configurations:
security_enabled = str(configurations[SECURITY_ENABLED_KEY]).upper() == 'TRUE'
check_command_timeout = CHECK_COMMAND_TIMEOUT_DEFAULT
if CHECK_COMMAND_TIMEOUT_KEY in configurations:
check_command_timeout = int(parameters[CHECK_COMMAND_TIMEOUT_KEY])
hive_user = HIVE_USER_DEFAULT
if HIVE_USER_KEY in configurations:
hive_user = configurations[HIVE_USER_KEY]
llap_app_name = LLAP_APP_NAME_DEFAULT
if LLAP_APP_NAME_KEY in configurations:
llap_app_name = configurations[LLAP_APP_NAME_KEY]
if security_enabled:
if HIVE_PRINCIPAL_KEY in configurations:
llap_principal = configurations[HIVE_PRINCIPAL_KEY]
else:
llap_principal = HIVE_PRINCIPAL_DEFAULT
llap_principal = llap_principal.replace('_HOST',host_name.lower())
llap_keytab = HIVE_PRINCIPAL_KEYTAB_DEFAULT
if HIVE_PRINCIPAL_KEYTAB_KEY in configurations:
llap_keytab = configurations[HIVE_PRINCIPAL_KEYTAB_KEY]
# Get the configured Kerberos executable search paths, if any
if KERBEROS_EXECUTABLE_SEARCH_PATHS_KEY in configurations:
kerberos_executable_search_paths = configurations[KERBEROS_EXECUTABLE_SEARCH_PATHS_KEY]
else:
kerberos_executable_search_paths = None
kinit_path_local = get_kinit_path(kerberos_executable_search_paths)
kinitcmd=format("{kinit_path_local} -kt {llap_keytab} {llap_principal}; ")
# prevent concurrent kinit
kinit_lock = global_lock.get_lock(global_lock.LOCK_TYPE_KERBEROS)
kinit_lock.acquire()
try:
Execute(kinitcmd, user=hive_user,
path=["/bin/", "/usr/bin/", "/usr/lib/hive/bin/", "/usr/sbin/"],
timeout=10)
finally:
kinit_lock.release()
start_time = time.time()
llap_status_cmd = STACK_ROOT_DEFAULT + format("/current/hive-server2/bin/hive --service llapstatus --name {llap_app_name} --findAppTimeout {LLAP_APP_STATUS_CMD_TIMEOUT}")
code, output, error = shell.checked_call(llap_status_cmd, user=hive_user, stderr=subprocess.PIPE,
timeout=check_command_timeout,
logoutput=False)
# Call for getting JSON
llap_app_info = make_valid_json(output)
if llap_app_info is None or 'state' not in llap_app_info:
alert_label = traceback.format_exc()
result_code = UKNOWN_STATUS_CODE
return (result_code, [alert_label])
retrieved_llap_app_state = llap_app_info['state'].upper()
if retrieved_llap_app_state in ['RUNNING_ALL']:
result_code = OK_RESULT_CODE
total_time = time.time() - start_time
alert_label = OK_MESSAGE.format(llap_app_state_dict.get(retrieved_llap_app_state, retrieved_llap_app_state), total_time)
elif retrieved_llap_app_state in ['RUNNING_PARTIAL']:
live_instances = 0
desired_instances = 0
percentInstancesUp = 0
percent_desired_instances_to_be_up = 80
# Get 'live' and 'desired' instances
if 'liveInstances' not in llap_app_info or 'desiredInstances' not in llap_app_info:
result_code = CRITICAL_RESULT_CODE
total_time = time.time() - start_time
alert_label = CRITICAL_MESSAGE_WITH_STATE.format(llap_app_state_dict.get(retrieved_llap_app_state, retrieved_llap_app_state), total_time)
return (result_code, [alert_label])
live_instances = llap_app_info['liveInstances']
desired_instances = llap_app_info['desiredInstances']
if live_instances < 0 or desired_instances <= 0:
result_code = CRITICAL_RESULT_CODE
total_time = time.time() - start_time
alert_label = CRITICAL_MESSAGE_WITH_STATE.format(llap_app_state_dict.get(retrieved_llap_app_state, retrieved_llap_app_state), total_time)
return (result_code, [alert_label])
percentInstancesUp = float(live_instances) / desired_instances * 100
if percentInstancesUp >= percent_desired_instances_to_be_up:
result_code = OK_RESULT_CODE
total_time = time.time() - start_time
alert_label = MESSAGE_WITH_STATE_AND_INSTANCES.format(llap_app_state_dict.get(retrieved_llap_app_state, retrieved_llap_app_state),
total_time,
llap_app_info['liveInstances'],
llap_app_info['desiredInstances'])
else:
result_code = CRITICAL_RESULT_CODE
total_time = time.time() - start_time
alert_label = MESSAGE_WITH_STATE_AND_INSTANCES.format(llap_app_state_dict.get(retrieved_llap_app_state, retrieved_llap_app_state),
total_time,
llap_app_info['liveInstances'],
llap_app_info['desiredInstances'])
else:
result_code = CRITICAL_RESULT_CODE
total_time = time.time() - start_time
alert_label = CRITICAL_MESSAGE_WITH_STATE.format(llap_app_state_dict.get(retrieved_llap_app_state, retrieved_llap_app_state), total_time)
except:
alert_label = traceback.format_exc()
traceback.format_exc()
result_code = UKNOWN_STATUS_CODE
return (result_code, [alert_label])
"""
Remove extra lines from 'llapstatus' status output (eg: because of MOTD logging) so as to have a valid JSON data to be passed in
to JSON converter.
"""
def make_valid_json(output):
'''
Note: It is assumed right now that extra lines will be only at the start and not at the end.
Sample expected JSON to be passed for 'loads' is either of the form :
Case 'A':
{
"amInfo" : {
"appName" : "llap0",
"appType" : "yarn-service",
"appId" : "APP1",
"containerId" : "container_1466036628595_0010_01_000001",
"hostname" : "hostName",
"amWebUrl" : "http://hostName:port/"
},
"state" : "LAUNCHING",
....
"desiredInstances" : 1,
"liveInstances" : 0,
....
....
}
or
Case 'B':
{
"state" : "APP_NOT_FOUND"
}
'''
splits = output.split("\n")
len_splits = len(splits)
if (len_splits < 3):
raise Fail("Malformed JSON data received from 'llapstatus' command. Exiting ....")
marker_idx = None # To detect where from to start reading for JSON data
for idx, split in enumerate(splits):
curr_elem = split.strip()
if idx + 2 > len_splits:
raise Fail(
"Iterated over the received 'llapstatus' comamnd. Couldn't validate the received output for JSON parsing.")
next_elem = (splits[(idx + 1)]).strip()
if curr_elem == "{":
if next_elem == "\"amInfo\" : {" and (splits[len_splits - 1]).strip() == '}':
# For Case 'A'
marker_idx = idx
break;
elif idx + 3 == len_splits and next_elem.startswith('"state" : ') and (splits[idx + 2]).strip() == '}':
# For Case 'B'
marker_idx = idx
break;
# Remove extra logging from possible JSON output
if marker_idx is None:
raise Fail("Couldn't validate the received output for JSON parsing.")
else:
if marker_idx != 0:
del splits[0:marker_idx]
scanned_output = '\n'.join(splits)
llap_app_info = json.loads(scanned_output)
return llap_app_info