| #!/usr/bin/env python |
| """ |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| |
| """ |
| # Python Imports |
| import subprocess |
| import os |
| import re |
| import time |
| import shutil |
| from datetime import datetime |
| import json |
| |
| # Ambari Commons & Resource Management imports |
| from resource_management.libraries.script.script import Script |
| from resource_management.libraries.functions import format |
| from resource_management.libraries.functions.check_process_status import check_process_status |
| from resource_management.core.source import InlineTemplate |
| from resource_management.core.resources.system import Execute, Directory |
| |
| # Imports needed for Rolling/Express Upgrade |
| from resource_management.libraries.functions import StackFeature |
| from resource_management.libraries.functions.stack_features import check_stack_feature |
| from resource_management.libraries.functions import stack_select |
| from resource_management.libraries.functions.copy_tarball import copy_to_hdfs |
| |
| from resource_management.core import shell |
| from resource_management.core.exceptions import Fail |
| from resource_management.core.logger import Logger |
| from ambari_commons import OSCheck, OSConst |
| from ambari_commons.os_family_impl import OsFamilyImpl |
| |
| from resource_management.core.exceptions import ComponentIsNotRunning |
| from resource_management.libraries.functions.decorator import retry |
| from resource_management.libraries.functions.security_commons import build_expectations, \ |
| cached_kinit_executor, get_params_from_filesystem, validate_security_config_properties, \ |
| FILE_TYPE_XML |
| |
| # Local Imports |
| from setup_ranger_hive import setup_ranger_hive |
| from hive_service_interactive import hive_service_interactive |
| from hive_interactive import hive_interactive |
| from hive_server import HiveServerDefault |
| from setup_ranger_hive_interactive import setup_ranger_hive_interactive |
| |
| import traceback |
| |
| class HiveServerInteractive(Script): |
| pass |
| |
| |
| @OsFamilyImpl(os_family=OsFamilyImpl.DEFAULT) |
| class HiveServerInteractiveDefault(HiveServerInteractive): |
| def install(self, env): |
| import params |
| self.install_packages(env) |
| |
| def configure(self, env): |
| import params |
| env.set_params(params) |
| hive_interactive(name='hiveserver2') |
| |
| def pre_upgrade_restart(self, env, upgrade_type=None): |
| Logger.info("Executing Hive Server Interactive Stack Upgrade pre-restart") |
| import params |
| env.set_params(params) |
| |
| if params.version and check_stack_feature(StackFeature.ROLLING_UPGRADE, params.version): |
| stack_select.select_packages(params.version) |
| |
| # Copy hive.tar.gz and tez.tar.gz used by Hive Interactive to HDFS |
| resource_created = copy_to_hdfs( |
| "hive2", |
| params.user_group, |
| params.hdfs_user, |
| skip=params.sysprep_skip_copy_tarballs_hdfs) |
| |
| resource_created = copy_to_hdfs( |
| "tez_hive2", |
| params.user_group, |
| params.hdfs_user, |
| skip=params.sysprep_skip_copy_tarballs_hdfs) or resource_created |
| |
| if resource_created: |
| params.HdfsResource(None, action="execute") |
| |
| def start(self, env, upgrade_type=None): |
| import params |
| env.set_params(params) |
| self.configure(env) |
| |
| if params.security_enabled: |
| # Do the security setup, internally calls do_kinit() |
| self.setup_security() |
| |
| # TODO : We need have conditional [re]start of LLAP once "status check command" for LLAP is ready. |
| # Check status and based on that decide on [re]starting. |
| |
| # Start LLAP before Hive Server Interactive start. |
| status = self._llap_start(env) |
| if not status: |
| # if we couldnt get LLAP in RUNNING or RUNNING_ALL state, stop LLAP process before bailing out. |
| self._llap_stop(env) |
| raise Fail("Skipping START of Hive Server Interactive since LLAP app couldn't be STARTED.") |
| |
| # TODO : test the workability of Ranger and Hive2 during upgrade |
| setup_ranger_hive_interactive(upgrade_type=upgrade_type) |
| hive_service_interactive('hiveserver2', action='start', upgrade_type=upgrade_type) |
| |
| |
| def stop(self, env, upgrade_type=None): |
| import params |
| env.set_params(params) |
| |
| if params.security_enabled: |
| self.do_kinit() |
| |
| # Stop Hive Interactive Server first |
| hive_service_interactive('hiveserver2', action='stop') |
| |
| if not params.is_restart_command: |
| self._llap_stop(env) |
| else: |
| Logger.info("LLAP stop is skipped as its a restart command") |
| |
| def status(self, env): |
| import status_params |
| env.set_params(status_params) |
| |
| # We are not doing 'llap' status check done here as part of status check for 'HSI', as 'llap' status |
| # check is a heavy weight operation. |
| |
| pid_file = format("{hive_pid_dir}/{hive_interactive_pid}") |
| # Recursively check all existing gmetad pid files |
| check_process_status(pid_file) |
| |
| def restart_llap(self, env): |
| """ |
| Custom command to Restart LLAP |
| """ |
| Logger.info("Custom Command to retart LLAP") |
| import params |
| env.set_params(params) |
| |
| if params.security_enabled: |
| self.do_kinit() |
| |
| self._llap_stop(env) |
| self._llap_start(env) |
| |
| def _llap_stop(self, env): |
| import params |
| Logger.info("Stopping LLAP") |
| |
| stop_cmd = ["slider", "stop", params.llap_app_name] |
| |
| code, output, error = shell.call(stop_cmd, user=params.hive_user, stderr=subprocess.PIPE, logoutput=True) |
| if code == 0: |
| Logger.info(format("Stopped {params.llap_app_name} application on Slider successfully")) |
| elif code == 69 and output is not None and "Unknown application instance" in output: |
| Logger.info(format("Application {params.llap_app_name} was already stopped on Slider")) |
| else: |
| raise Fail(format("Could not stop application {params.llap_app_name} on Slider. {error}\n{output}")) |
| |
| # Will exit with code 4 if need to run with "--force" to delete directories and registries. |
| Execute(('slider', 'destroy', params.llap_app_name, "--force"), |
| user=params.hive_user, |
| timeout=30, |
| ignore_failures=True, |
| ) |
| |
| """ |
| Controls the start of LLAP. |
| """ |
| def _llap_start(self, env, cleanup=False): |
| import params |
| env.set_params(params) |
| |
| if params.hive_server_interactive_ha: |
| """ |
| Check llap app state |
| """ |
| Logger.info("HSI HA is enabled. Checking if LLAP is already running ...") |
| if params.stack_supports_hive_interactive_ga: |
| status = self.check_llap_app_status_in_llap_ga(params.llap_app_name, 2, params.hive_server_interactive_ha) |
| else: |
| status = self.check_llap_app_status_in_llap_tp(params.llap_app_name, 2, params.hive_server_interactive_ha) |
| |
| if status: |
| Logger.info("LLAP app '{0}' is already running.".format(params.llap_app_name)) |
| return True |
| else: |
| Logger.info("LLAP app '{0}' is not running. llap will be started.".format(params.llap_app_name)) |
| pass |
| |
| # Call for cleaning up the earlier run(s) LLAP package folders. |
| self._cleanup_past_llap_package_dirs() |
| |
| Logger.info("Starting LLAP") |
| LLAP_PACKAGE_CREATION_PATH = Script.get_tmp_dir() |
| |
| unique_name = "llap-slider%s" % datetime.utcnow().strftime('%Y-%m-%d_%H-%M-%S') |
| |
| cmd = format("{stack_root}/current/hive-server2-hive2/bin/hive --service llap --slider-am-container-mb {params.slider_am_container_mb} " |
| "--size {params.llap_daemon_container_size}m --cache {params.hive_llap_io_mem_size}m --xmx {params.llap_heap_size}m " |
| "--loglevel {params.llap_log_level} {params.llap_extra_slider_opts} --output {LLAP_PACKAGE_CREATION_PATH}/{unique_name}") |
| |
| # Append params that are supported from Hive llap GA version. |
| if params.stack_supports_hive_interactive_ga: |
| # Figure out the Slider Anti-affinity to be used. |
| # YARN does not support anti-affinity, and therefore Slider implements AA by the means of exclusion lists, i.e, it |
| # starts containers one by one and excludes the nodes it gets (adding a delay of ~2sec./machine). When the LLAP |
| # container memory size configuration is more than half of YARN node memory, AA is implicit and should be avoided. |
| slider_placement = 4 |
| if long(params.llap_daemon_container_size) > (0.5 * long(params.yarn_nm_mem)): |
| slider_placement = 0 |
| Logger.info("Setting slider_placement : 0, as llap_daemon_container_size : {0} > 0.5 * " |
| "YARN NodeManager Memory({1})".format(params.llap_daemon_container_size, params.yarn_nm_mem)) |
| else: |
| Logger.info("Setting slider_placement: 4, as llap_daemon_container_size : {0} <= 0.5 * " |
| "YARN NodeManager Memory({1})".format(params.llap_daemon_container_size, params.yarn_nm_mem)) |
| cmd += format(" --slider-placement {slider_placement} --skiphadoopversion --skiphbasecp --instances {params.num_llap_daemon_running_nodes}") |
| |
| # Setup the logger for the ga version only |
| cmd += format(" --logger {params.llap_logger}") |
| else: |
| cmd += format(" --instances {params.num_llap_nodes}") |
| if params.security_enabled: |
| llap_keytab_splits = params.hive_llap_keytab_file.split("/") |
| Logger.debug("llap_keytab_splits : {0}".format(llap_keytab_splits)) |
| cmd += format(" --slider-keytab-dir .slider/keytabs/{params.hive_user}/ --slider-keytab " |
| "{llap_keytab_splits[4]} --slider-principal {params.hive_llap_principal}") |
| |
| # Add the aux jars if they are specified. If empty, dont need to add this param. |
| if params.hive_aux_jars: |
| cmd+= format(" --auxjars {params.hive_aux_jars}") |
| |
| # Append args. |
| llap_java_args = InlineTemplate(params.llap_app_java_opts).get_content() |
| cmd += format(" --args \" {llap_java_args}\"") |
| # Append metaspace size to args. |
| if params.java_version > 7 and params.llap_daemon_container_size > 4096: |
| if params.llap_daemon_container_size <= 32768: |
| metaspaceSize = "256m" |
| else: |
| metaspaceSize = "1024m" |
| cmd = cmd[:-1] + " -XX:MetaspaceSize="+metaspaceSize+ "\"" |
| |
| run_file_path = None |
| try: |
| Logger.info(format("LLAP start command: {cmd}")) |
| code, output, error = shell.checked_call(cmd, user=params.hive_user, quiet = True, stderr=subprocess.PIPE, logoutput=True) |
| |
| if code != 0 or output is None: |
| raise Fail("Command failed with either non-zero return code or no output.") |
| |
| # E.g., output: |
| # Prepared llap-slider-05Apr2016/run.sh for running LLAP on Slider |
| exp = r"Prepared (.*?run.sh) for running LLAP" |
| run_file_path = None |
| out_splits = output.split("\n") |
| for line in out_splits: |
| line = line.strip() |
| m = re.match(exp, line, re.I) |
| if m and len(m.groups()) == 1: |
| run_file_name = m.group(1) |
| run_file_path = os.path.join(params.hive_user_home_dir, run_file_name) |
| break |
| if not run_file_path: |
| raise Fail("Did not find run.sh file in output: " + str(output)) |
| |
| Logger.info(format("Run file path: {run_file_path}")) |
| Execute(run_file_path, user=params.hive_user, logoutput=True) |
| Logger.info("Submitted LLAP app name : {0}".format(params.llap_app_name)) |
| |
| # We need to check the status of LLAP app to figure out it got |
| # launched properly and is in running state. Then go ahead with Hive Interactive Server start. |
| if params.stack_supports_hive_interactive_ga: |
| status = self.check_llap_app_status_in_llap_ga(params.llap_app_name, params.num_retries_for_checking_llap_status) |
| else: |
| status = self.check_llap_app_status_in_llap_tp(params.llap_app_name, params.num_retries_for_checking_llap_status) |
| if status: |
| Logger.info("LLAP app '{0}' deployed successfully.".format(params.llap_app_name)) |
| return True |
| else: |
| Logger.error("LLAP app '{0}' deployment unsuccessful.".format(params.llap_app_name)) |
| return False |
| except: |
| # Attempt to clean up the packaged application, or potentially rename it with a .bak |
| if run_file_path is not None and cleanup: |
| parent_dir = os.path.dirname(run_file_path) |
| Directory(parent_dir, |
| action = "delete", |
| ignore_failures = True, |
| ) |
| |
| # throw the original exception |
| raise |
| |
| """ |
| Checks and deletes previous run 'LLAP package' folders, ignoring three latest packages. |
| Last three are are ignore for debugging/reference purposes. |
| Helps in keeping check on disk space used. |
| """ |
| def _cleanup_past_llap_package_dirs(self): |
| try: |
| import params |
| Logger.info("Determining previous run 'LLAP package' folder(s) to be deleted ....") |
| llap_package_folder_name_prefix = "llap-slider" # Package name is like : llap-sliderYYYY-MM-DD-HH:MM:SS |
| num_folders_to_retain = 3 # Hardcoding it as of now, as no considerable use was found to provide an env param. |
| file_names = [dir_name for dir_name in os.listdir(Script.get_tmp_dir()) |
| if dir_name.startswith(llap_package_folder_name_prefix)] |
| |
| file_names.sort() |
| del file_names[-num_folders_to_retain:] # Ignore 'num_folders_to_retain' latest package folders. |
| Logger.info("Previous run 'LLAP package' folder(s) to be deleted = {0}".format(file_names)) |
| |
| if file_names: |
| for path in file_names: |
| abs_path = Script.get_tmp_dir()+"/"+path |
| Directory(abs_path, |
| action = "delete", |
| ignore_failures = True |
| ) |
| else: |
| Logger.info("No '{0}*' folder deleted.".format(llap_package_folder_name_prefix)) |
| except: |
| Logger.exception("Exception while doing cleanup for past 'LLAP package(s)':") |
| |
| |
| |
| """ |
| Does kinit and copies keytab for Hive/LLAP to HDFS. |
| """ |
| def setup_security(self): |
| import params |
| |
| self.do_kinit() |
| |
| # Copy params.hive_llap_keytab_file to hdfs://<host>:<port>/user/<hive_user>/.slider/keytabs/<hive_user> , required by LLAP |
| slider_keytab_install_cmd = format("slider install-keytab --keytab {params.hive_llap_keytab_file} --folder {params.hive_user} --overwrite") |
| Execute(slider_keytab_install_cmd, user=params.hive_user) |
| |
| def do_kinit(self): |
| import params |
| |
| hive_interactive_kinit_cmd = format("{kinit_path_local} -kt {params.hive_server2_keytab} {params.hive_principal}; ") |
| Execute(hive_interactive_kinit_cmd, user=params.hive_user) |
| |
| """ |
| Get llap app status data for LLAP Tech Preview code base. |
| """ |
| def _get_llap_app_status_info_in_llap_tp(self, app_name): |
| import status_params |
| LLAP_APP_STATUS_CMD_TIMEOUT = 0 |
| |
| llap_status_cmd = format("{stack_root}/current/hive-server2-hive2/bin/hive --service llapstatus --name {app_name} --findAppTimeout {LLAP_APP_STATUS_CMD_TIMEOUT}") |
| code, output, error = shell.checked_call(llap_status_cmd, user=status_params.hive_user, stderr=subprocess.PIPE, |
| logoutput=False) |
| Logger.info("Received 'llapstatus' command 'output' : {0}".format(output)) |
| if code == 0: |
| return self._make_valid_json(output) |
| else: |
| Logger.info("'LLAP status command' output : ", output) |
| Logger.info("'LLAP status command' error : ", error) |
| Logger.info("'LLAP status command' exit code : ", code) |
| raise Fail("Error getting LLAP app status. ") |
| |
| """ |
| Get llap app status data for LLAP GA code base. |
| |
| Parameters: 'percent_desired_instances_to_be_up' : A value b/w 0.0 and 1.0. |
| 'total_timeout' : Total wait time while checking the status via llapstatus command |
| 'refresh_rate' : Frequency of polling for llapstatus. |
| """ |
| def _get_llap_app_status_info_in_llap_ga(self, percent_desired_instances_to_be_up, total_timeout, refresh_rate): |
| import status_params |
| |
| # llapstatus comamnd : llapstatus -w -r <percent containers to wait for to be Up> -i <refresh_rate> -t <total timeout for this comand> |
| # -w : Watch mode waits until all LLAP daemons are running or subset of the nodes are running (threshold can be specified via -r option) (Default wait until all nodes are running) |
| # -r : When watch mode is enabled (-w), wait until the specified threshold of nodes are running (Default 1.0 which means 100% nodes are running) |
| # -i : Amount of time in seconds to wait until subsequent status checks in watch mode (Default: 1sec) |
| # -t : Exit watch mode if the desired state is not attained until the specified timeout (Default: 300sec) |
| # |
| # example : llapstatus -w -r 0.8 -i 2 -t 150 |
| llap_status_cmd = format("{stack_root}/current/hive-server2-hive2/bin/hive --service llapstatus -w -r {percent_desired_instances_to_be_up} -i {refresh_rate} -t {total_timeout}") |
| Logger.info("\n\n\n\n\n"); |
| Logger.info("LLAP status command : {0}".format(llap_status_cmd)) |
| code, output, error = shell.checked_call(llap_status_cmd, user=status_params.hive_user, quiet=True, stderr=subprocess.PIPE, |
| logoutput=True) |
| |
| if code == 0: |
| return self._make_valid_json(output) |
| else: |
| Logger.info("'LLAP status command' output : ", output) |
| Logger.info("'LLAP status command' error : ", error) |
| Logger.info("'LLAP status command' exit code : ", code) |
| raise Fail("Error getting LLAP app status. ") |
| |
| |
| |
| |
| """ |
| Remove extra lines (begginning/end) from 'llapstatus' status output (eg: because of MOTD logging) so as to have |
| a valid JSON data to be passed in to JSON converter. |
| """ |
| def _make_valid_json(self, output): |
| ''' |
| |
| Note: Extra lines (eg: because of MOTD) may be at the start or the end (some other logging getting appended) |
| of the passed-in data. |
| |
| Sample expected JSON to be passed for 'loads' is either of the form : |
| |
| Case 'A': |
| { |
| "amInfo" : { |
| "appName" : "llap0", |
| "appType" : "org-apache-slider", |
| "appId" : "APP1", |
| "containerId" : "container_1466036628595_0010_01_000001", |
| "hostname" : "hostName", |
| "amWebUrl" : "http://hostName:port/" |
| }, |
| "state" : "LAUNCHING", |
| .... |
| "desiredInstances" : 1, |
| "liveInstances" : 0, |
| .... |
| .... |
| } |
| |
| or |
| |
| Case 'B': |
| { |
| "state" : "APP_NOT_FOUND" |
| } |
| |
| ''' |
| splits = output.split("\n") |
| |
| len_splits = len(splits) |
| if (len_splits < 3): |
| raise Fail ("Malformed JSON data received from 'llapstatus' command. Exiting ....") |
| |
| # Firstly, remove extra lines from the END. |
| updated_splits = [] |
| for itr, line in enumerate(reversed(splits)): |
| if line == "}": # Our assumption of end of JSON data. |
| updated_splits = splits[:-itr] |
| break |
| |
| if len(updated_splits) > 0: |
| splits = updated_splits |
| len_splits = len(splits) |
| |
| |
| # Secondly, remove extra lines from the BEGGINNING. |
| marker_idx = None # To detect where from to start reading for JSON data |
| for idx, split in enumerate(splits): |
| curr_elem = split.strip() |
| if idx+2 > len_splits: |
| raise Fail("Iterated over the received 'llapstatus' comamnd. Couldn't validate the received output for JSON parsing.") |
| next_elem = (splits[(idx + 1)]).strip() |
| if curr_elem == "{": |
| if next_elem == "\"amInfo\" : {" and (splits[len_splits-1]).strip() == '}': |
| # For Case 'A' |
| marker_idx = idx |
| break; |
| elif idx+3 == len_splits and next_elem.startswith('"state" : ') and (splits[idx + 2]).strip() == '}': |
| # For Case 'B' |
| marker_idx = idx |
| break; |
| |
| |
| # Remove extra logging from possible JSON output |
| if marker_idx is None: |
| raise Fail("Couldn't validate the received output for JSON parsing.") |
| else: |
| if marker_idx != 0: |
| del splits[0:marker_idx] |
| |
| scanned_output = '\n'.join(splits) |
| llap_app_info = json.loads(scanned_output) |
| return llap_app_info |
| |
| |
| """ |
| Checks llap app status. The states can be : 'COMPLETE', 'APP_NOT_FOUND', 'RUNNING_PARTIAL', 'RUNNING_ALL' & 'LAUNCHING'. |
| |
| if app is in 'APP_NOT_FOUND', 'RUNNING_PARTIAL' and 'LAUNCHING' state: |
| we wait for 'num_times_to_wait' to have app in (1). 'RUNNING_ALL' or (2). 'RUNNING_PARTIAL' |
| state with 80% or more 'desiredInstances' running and Return True |
| else : |
| Return False |
| |
| Parameters: llap_app_name : deployed llap app name. |
| num_retries : Number of retries to check the LLAP app status. |
| """ |
| def check_llap_app_status_in_llap_tp(self, llap_app_name, num_retries, return_immediately_if_stopped=False): |
| curr_time = time.time() |
| num_retries = int(num_retries) |
| if num_retries <= 0: |
| Logger.info("Read 'num_retries' as : {0}. Setting it to : {1}".format(num_retries, 2)) |
| num_retries = 2 |
| if num_retries > 20: |
| Logger.info("Read 'num_retries' as : {0}. Setting it to : {1}".format(num_retries, 20)) |
| num_retries = 20 |
| |
| @retry(times=num_retries, sleep_time=2, err_class=Fail) |
| def do_retries(): |
| llap_app_info = self._get_llap_app_status_info_in_llap_tp(llap_app_name) |
| return self._verify_llap_app_status(llap_app_info, llap_app_name, return_immediately_if_stopped, curr_time) |
| |
| try: |
| status = do_retries() |
| return status |
| except Exception, e: |
| Logger.info("LLAP app '{0}' did not come up after a wait of {1} seconds.".format(llap_app_name, |
| time.time() - curr_time)) |
| traceback.print_exc() |
| return False |
| |
| def check_llap_app_status_in_llap_ga(self, llap_app_name, num_retries, return_immediately_if_stopped=False): |
| curr_time = time.time() |
| total_timeout = int(num_retries) * 20; # Total wait time while checking the status via llapstatus command |
| Logger.debug("Calculated 'total_timeout' : {0} using config 'num_retries_for_checking_llap_status' : {1}".format(total_timeout, num_retries)) |
| refresh_rate = 2 # Frequency of checking the llapstatus |
| percent_desired_instances_to_be_up = 80 # Out of 100. |
| llap_app_info = self._get_llap_app_status_info_in_llap_ga(percent_desired_instances_to_be_up/100.0, total_timeout, refresh_rate) |
| |
| try: |
| return self._verify_llap_app_status(llap_app_info, llap_app_name, return_immediately_if_stopped, curr_time) |
| except Exception as e: |
| Logger.info(e.message) |
| return False |
| |
| def get_log_folder(self): |
| import params |
| return params.hive_log_dir |
| |
| def get_user(self): |
| import params |
| return params.hive_user |
| |
| def _verify_llap_app_status(self, llap_app_info, llap_app_name, return_immediately_if_stopped, curr_time): |
| if llap_app_info is None or 'state' not in llap_app_info: |
| Logger.error("Malformed JSON data received for LLAP app. Exiting ....") |
| return False |
| |
| # counters based on various states. |
| live_instances = 0 |
| desired_instances = 0 |
| percent_desired_instances_to_be_up = 80 # Used in 'RUNNING_PARTIAL' state. |
| if return_immediately_if_stopped and (llap_app_info['state'].upper() in ('APP_NOT_FOUND', 'COMPLETE')): |
| return False |
| if llap_app_info['state'].upper() == 'RUNNING_ALL': |
| Logger.info( |
| "LLAP app '{0}' in '{1}' state.".format(llap_app_name, llap_app_info['state'])) |
| return True |
| elif llap_app_info['state'].upper() == 'RUNNING_PARTIAL': |
| # Check how many instances were up. |
| if 'liveInstances' in llap_app_info and 'desiredInstances' in llap_app_info: |
| live_instances = llap_app_info['liveInstances'] |
| desired_instances = llap_app_info['desiredInstances'] |
| else: |
| Logger.info( |
| "LLAP app '{0}' is in '{1}' state, but 'instances' information not available in JSON received. " \ |
| "Exiting ....".format(llap_app_name, llap_app_info['state'])) |
| Logger.info(llap_app_info) |
| return False |
| if desired_instances == 0: |
| Logger.info("LLAP app '{0}' desired instance are set to 0. Exiting ....".format(llap_app_name)) |
| return False |
| |
| percentInstancesUp = 0 |
| if live_instances > 0: |
| percentInstancesUp = float(live_instances) / desired_instances * 100 |
| if percentInstancesUp >= percent_desired_instances_to_be_up: |
| Logger.info("LLAP app '{0}' in '{1}' state. Live Instances : '{2}' >= {3}% of Desired Instances : " \ |
| "'{4}'.".format(llap_app_name, llap_app_info['state'], |
| llap_app_info['liveInstances'], |
| percent_desired_instances_to_be_up, |
| llap_app_info['desiredInstances'])) |
| return True |
| else: |
| Logger.info("LLAP app '{0}' in '{1}' state. Live Instances : '{2}'. Desired Instances : " \ |
| "'{3}' after {4} secs.".format(llap_app_name, llap_app_info['state'], |
| llap_app_info['liveInstances'], |
| llap_app_info['desiredInstances'], |
| time.time() - curr_time)) |
| raise Fail("App state is RUNNING_PARTIAL. Live Instances : '{0}', Desired Instance : '{1}'".format(llap_app_info['liveInstances'], |
| llap_app_info['desiredInstances'])) |
| elif llap_app_info['state'].upper() in ['APP_NOT_FOUND', 'LAUNCHING', 'COMPLETE']: |
| status_str = format("LLAP app '{0}' current state is {1}.".format(llap_app_name, llap_app_info['state'])) |
| Logger.info(status_str) |
| raise Fail(status_str) |
| else: # Covers any unknown that we get. |
| Logger.info( |
| "LLAP app '{0}' current state is '{1}'. Expected : 'RUNNING'.".format(llap_app_name, llap_app_info['state'])) |
| return False |
| |
| @OsFamilyImpl(os_family=OSConst.WINSRV_FAMILY) |
| class HiveServerInteractiveWindows(HiveServerInteractive): |
| |
| def status(self, env): |
| pass |
| |
| if __name__ == "__main__": |
| HiveServerInteractive().execute() |