blob: 95a83932c1023c1d8a538fa5c6223fa3ce91ebb1 [file] [log] [blame]
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
# Failure injection module for the Impala service. There are two main ways this module
# can be used - the first is to initialize the failure injector and then call start()
# which will kick off a timer that chooses a random impalad/state store process
# to fail each time timer fires.
# The second way this module can be used to to initialize it and call the actions
# directly (ex. kill_random_impalad()). This provides a bit more control over exactly
# when a failure will happen and is useful for targeted test scenarios.
import logging
from random import choice
from threading import Timer
logging.basicConfig(level=logging.INFO, format='%(threadName)s: %(message)s')
LOG = logging.getLogger('failure-injector')
# This class is used for injecting failures for the Impala service.
class FailureInjector(object):
def __init__(self, impala_cluster, failure_frequency, impalad_exclude_list=None):
"""
Initializes the FailureInjector object.
impala_cluster - An ImpalaCluster object (see the impala_cluster module)
failure_frequency - Interval to fire timer (in seconds)
impalad_exclude_list - A list of impalad host:port name to not inject failures
on. Useful to filter out the coordinator.
"""
self.cluster = impala_cluster
self.cluster.get_impala_service().set_process_auto_restart_config(value=True)
# TODO: Do we need to restart the impala service to apply this?
# self.cluster.get_impala_service().restart()
self.failure_frequency = failure_frequency
num_impalad_procs = len(self.cluster.get_impala_service().get_all_impalad_processes())
self.impalad_exclude_list = impalad_exclude_list
# Build a weighted list of possible actions. This is done using a trivial approach
# where we just add the item multiple times (weight value) into the action list.
# TODO: Provide a way to dynamically configure the weights
actions_with_weights = {self.kill_random_impalad: num_impalad_procs * 2,
self.kill_state_store: 1}
self.possible_actions = list()
for key, value in actions_with_weights.items():
self.possible_actions.extend([key] * value)
def start(self):
""" Starts the timer, triggering failures for the specified interval """
self.__start_timer()
def cancel(self):
""" Stops the timer, canceling any additional failures from occurring """
if self.__timer is not None:
self.__timer.cancel()
def kill_random_impalad(self):
""" Kills a randomly selected impalad instance not in the exlude list """
filtered_impalad = \
filter(lambda impalad: '%s:%d' % (impalad.hostname, impalad.be_port)\
not in self.impalad_exclude_list,
self.cluster.get_impala_service().get_all_impalad_processes())
self.kill_impalad(choice(filtered_impalad))
def kill_impalad(self, impalad):
""" Kills the specified impalad instance """
LOG.info('Chose impalad on "%s" to kill' % impalad.hostname)
impalad.kill()
def kill_state_store(self):
""" Kills the statestore process """
state_store = self.cluster.get_impala_service().get_state_store_process()
LOG.info('Chose statestore on "%s" to kill' % state_store.hostname)
state_store.kill()
def __start_timer(self):
""" Starts a new timer, cancelling the previous timer if it is running """
self.cancel()
self.__timer = Timer(self.failure_frequency, self.__choose_action)
self.__timer.start()
def __choose_action(self):
""" Chooses a failure action to perform """
action = choice(self.possible_actions)
LOG.info('Executing action: %s' % action)
action()
self.__start_timer()