blob: 387bc7e2c0e01bc33137961ae1d8974cd3d6d1e4 [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import signal
import json
import sys
import os
import time
import threading
import urllib2
import pprint
import math
from random import randint
from AgentConfig import AgentConfig
from AgentToggleLogger import AgentToggleLogger
from Heartbeat import Heartbeat
from Register import Register
from ActionQueue import ActionQueue
from NetUtil import NetUtil
from Registry import Registry
import ssl
import ProcessHelper
import Constants
import security
logger = logging.getLogger()
AGENT_AUTO_RESTART_EXIT_CODE = 77
HEART_BEAT_RETRY_THRESHOLD = 2
WS_AGENT_CONTEXT_ROOT = '/ws'
SLIDER_PATH_AGENTS = WS_AGENT_CONTEXT_ROOT + '/v1/slider/agents/'
SLIDER_REL_PATH_REGISTER = '/register'
SLIDER_REL_PATH_HEARTBEAT = '/heartbeat'
class State:
INIT, INSTALLING, INSTALLED, STARTING, STARTED, FAILED = range(6)
class Controller(threading.Thread):
def __init__(self, config, range=30):
threading.Thread.__init__(self)
logger.debug('Initializing Controller RPC thread.')
self.lock = threading.Lock()
self.safeMode = True
self.credential = None
self.config = config
self.label = config.getLabel()
self.hostname = config.get(AgentConfig.SERVER_SECTION, 'hostname')
self.secured_port = config.get(AgentConfig.SERVER_SECTION, 'secured_port')
self.server_url = 'https://' + self.hostname + ':' + self.secured_port
self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_REGISTER
self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_HEARTBEAT
self.netutil = NetUtil()
self.responseId = -1
self.repeatRegistration = False
self.isRegistered = False
self.cachedconnect = None
self.range = range
self.hasMappedComponents = True
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
self.heartbeat_wait_event = threading.Event()
# List of callbacks that are called at agent registration
self.registration_listeners = []
self.componentExpectedState = State.INIT
self.componentActualState = State.INIT
self.statusCommand = None
self.failureCount = 0
self.heartBeatRetryCount = 0
self.autoRestartFailures = 0
self.autoRestartTrackingSince = 0
self.terminateAgent = False
self.stopCommand = None
self.appGracefulStopQueued = False
self.appGracefulStopTriggered = False
self.tags = ""
def __del__(self):
logger.info("Server connection disconnected.")
pass
def processDebugCommandForRegister(self):
self.processDebugCommand(Constants.DO_NOT_REGISTER)
pass
def processDebugCommandForHeartbeat(self):
self.processDebugCommand(Constants.DO_NOT_HEARTBEAT)
pass
def processDebugCommand(self, command):
if self.config.isDebugEnabled() and self.config.debugCommand() == command:
## Test support - sleep for 10 minutes
logger.info("Received debug command: "
+ self.config.debugCommand() + " Sleeping for 10 minutes")
time.sleep(60*10)
pass
pass
def registerWithServer(self):
id = -1
ret = {}
self.processDebugCommandForRegister()
while not self.isRegistered:
try:
data = json.dumps(self.register.build(
self.componentActualState,
self.componentExpectedState,
self.actionQueue.customServiceOrchestrator.allocated_ports,
self.actionQueue.customServiceOrchestrator.log_folders,
self.tags,
id))
logger.info("Registering with the server at " + self.registerUrl +
" with data " + pprint.pformat(data))
response = self.sendRequest(self.registerUrl, data)
regResp = json.loads(response)
exitstatus = 0
# exitstatus is a code of error which was raised on server side.
# exitstatus = 0 (OK - Default)
# exitstatus = 1 (Registration failed because
# different version of agent and server)
if 'exitstatus' in regResp.keys():
exitstatus = int(regResp['exitstatus'])
# log - message, which will be printed to agents log
if 'log' in regResp.keys():
log = regResp['log']
# container may be associated with tags
if 'tags' in regResp.keys():
self.tags = regResp['tags']
if exitstatus == 1:
logger.error(log)
self.isRegistered = False
self.repeatRegistration = False
return regResp
logger.info("Registered with the server with " + pprint.pformat(regResp))
print("Registered with the server")
self.responseId = int(regResp['responseId'])
self.isRegistered = True
if 'statusCommands' in regResp.keys():
logger.info("Got status commands on registration " + pprint.pformat(
regResp['statusCommands']))
self.addToQueue(regResp['statusCommands'])
pass
else:
self.hasMappedComponents = False
pass
except ssl.SSLError:
self.repeatRegistration = False
self.isRegistered = False
return
except Exception, err:
# try a reconnect only after a certain amount of random time
delay = randint(0, self.range)
logger.info("Unable to connect to: " + self.registerUrl, exc_info=True)
""" Sleeping for {0} seconds and then retrying again """.format(delay)
time.sleep(delay)
pass
pass
return regResp
def addToQueue(self, commands):
"""Add to the queue for running the commands """
""" Put the required actions into the Queue """
if not commands:
logger.debug("No commands from the server : " + pprint.pformat(commands))
else:
"""Only add to the queue if not empty list """
self.actionQueue.put(commands)
pass
# For testing purposes
DEBUG_HEARTBEAT_RETRIES = 0
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False
MAX_FAILURE_COUNT_TO_STOP = 2
def shouldStopAgent(self):
'''
Stop the agent if:
- Component has failed after start
- AM sent terminate agent command
'''
shouldStopAgent = False
if (self.componentActualState == State.FAILED) \
and (self.componentExpectedState == State.STARTED) \
and (self.failureCount >= Controller.MAX_FAILURE_COUNT_TO_STOP):
logger.info("Component instance has stopped, stopping the agent ...")
shouldStopAgent = True
if self.terminateAgent:
logger.info("Terminate agent command received from AM, stopping the agent ...")
shouldStopAgent = True
return shouldStopAgent
def isAppGracefullyStopped(self):
'''
If an app graceful stop command was queued then it is considered stopped if:
- app stop was triggered
Note: We should enhance this method by checking if the app is stopped
successfully and if not, then take alternate measures (like kill
processes). For now if stop is triggered it is considered stopped.
'''
isAppStopped = False
if self.appGracefulStopTriggered:
isAppStopped = True
return isAppStopped
def stopApp(self):
'''
Stop the app if:
- the app is currently in STARTED state and
a valid stop command is provided
'''
if (self.componentActualState == State.STARTED) and (not self.stopCommand == None):
# Try to do graceful stop
self.addToQueue([self.stopCommand])
self.appGracefulStopQueued = True
logger.info("Attempting to gracefully stop the application ...")
def heartbeatWithServer(self):
self.DEBUG_HEARTBEAT_RETRIES = 0
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
retry = False
certVerifFailed = False
self.processDebugCommandForHeartbeat()
while not self.DEBUG_STOP_HEARTBEATING:
commandResult = {}
try:
if self.appGracefulStopQueued and not self.isAppGracefullyStopped():
# Continue to wait until app is stopped
continue
if self.shouldStopAgent():
ProcessHelper.stopAgent()
if not retry:
data = json.dumps(
self.heartbeat.build(commandResult,
self.responseId, self.hasMappedComponents))
self.updateStateBasedOnResult(commandResult)
logger.debug("Sending request: " + data)
pass
else:
self.DEBUG_HEARTBEAT_RETRIES += 1
response = self.sendRequest(self.heartbeatUrl, data)
response = json.loads(response)
logger.debug('Got server response: ' + pprint.pformat(response))
serverId = int(response['responseId'])
if 'restartAgent' in response.keys():
restartAgent = response['restartAgent']
if restartAgent:
logger.error("Got restartAgent command")
self.restartAgent()
if 'terminateAgent' in response.keys():
self.terminateAgent = response['terminateAgent']
if self.terminateAgent:
logger.error("Got terminateAgent command")
self.stopApp()
# Continue will add some wait time
continue
restartEnabled = False
if 'restartEnabled' in response:
restartEnabled = response['restartEnabled']
if restartEnabled:
logger.debug("Component auto-restart is enabled.")
if 'hasMappedComponents' in response.keys():
self.hasMappedComponents = response['hasMappedComponents'] != False
if 'registrationCommand' in response.keys():
# check if the registration command is None. If none skip
if response['registrationCommand'] is not None:
logger.info(
"RegistrationCommand received - repeat agent registration")
self.isRegistered = False
self.repeatRegistration = True
return
if serverId != self.responseId + 1:
logger.error("Error in responseId sequence expected " + str(self.responseId + 1)
+ " but got " + str(serverId) + " - restarting")
self.restartAgent()
else:
self.responseId = serverId
commandSentFromAM = False
if 'executionCommands' in response.keys():
self.updateStateBasedOnCommand(response['executionCommands'])
self.addToQueue(response['executionCommands'])
commandSentFromAM = True
pass
if 'statusCommands' in response.keys() and len(response['statusCommands']) > 0:
self.addToQueue(response['statusCommands'])
commandSentFromAM = True
pass
if not commandSentFromAM:
logger.info("No commands sent from the Server.")
pass
# Add a start command
if self.componentActualState == State.FAILED and \
self.componentExpectedState == State.STARTED and restartEnabled:
stored_command = self.actionQueue.customServiceOrchestrator.stored_command
if len(stored_command) > 0:
auto_start_command = self.create_start_command(stored_command)
if auto_start_command and self.shouldAutoRestart():
logger.info("Automatically adding a start command.")
logger.debug("Auto start command: " + pprint.pformat(auto_start_command))
self.updateStateBasedOnCommand([auto_start_command], False)
self.addToQueue([auto_start_command])
pass
# Add a status command
if (self.componentActualState != State.STARTING and \
self.componentExpectedState == State.STARTED) and \
not self.statusCommand == None:
self.addToQueue([self.statusCommand])
if retry:
print("Reconnected to the server")
logger.info("Reconnected to the server")
retry = False
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
self.heartbeat_wait_event.clear()
except ssl.SSLError:
self.repeatRegistration = False
self.isRegistered = False
return
except Exception, err:
#randomize the heartbeat
delay = randint(0, self.range)
time.sleep(delay)
if "code" in err:
logger.error(err.code)
else:
logger.error(
"Unable to connect to: " + self.heartbeatUrl + " due to " + str(
err))
logger.debug("Details: " + str(err), exc_info=True)
if not retry:
print("Connection to the server was lost. Reconnecting...")
if 'certificate verify failed' in str(err) and not certVerifFailed:
print(
"Server certificate verify failed. Did you regenerate server certificate?")
certVerifFailed = True
self.heartBeatRetryCount += 1
logger.error(
"Heartbeat retry count = %d" % (self.heartBeatRetryCount))
# Re-read zk registry in case AM was restarted and came up with new
# host/port, but do this only after heartbeat retry attempts crosses
# threshold
if self.heartBeatRetryCount > HEART_BEAT_RETRY_THRESHOLD:
self.isRegistered = False
self.repeatRegistration = True
self.heartBeatRetryCount = 0
self.cachedconnect = None # Previous connection is broken now
zk_quorum = self.config.get(AgentConfig.SERVER_SECTION, Constants.ZK_QUORUM)
zk_reg_path = self.config.get(AgentConfig.SERVER_SECTION, Constants.ZK_REG_PATH)
registry = Registry(zk_quorum, zk_reg_path)
amHost, amUnsecuredPort, amSecuredPort = registry.readAMHostPort()
self.hostname = amHost
self.secured_port = amSecuredPort
self.config.set(AgentConfig.SERVER_SECTION, "hostname", self.hostname)
self.config.set(AgentConfig.SERVER_SECTION, "secured_port", self.secured_port)
self.server_url = 'https://' + self.hostname + ':' + self.secured_port
self.registerUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_REGISTER
self.heartbeatUrl = self.server_url + SLIDER_PATH_AGENTS + self.label + SLIDER_REL_PATH_HEARTBEAT
return
self.cachedconnect = None # Previous connection is broken now
retry = True
finally:
# Sleep for some time
timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
- self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
self.heartbeat_wait_event.wait(timeout=timeout)
# Sleep a bit more to allow STATUS_COMMAND results to be collected
# and sent in one heartbeat. Also avoid server overload with heartbeats
time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
pass
logger.info("Controller stopped heart-beating.")
def create_start_command(self, stored_command):
taskId = int(stored_command['taskId'])
taskId = taskId + 1
stored_command['taskId'] = taskId
stored_command['commandId'] = "{0}-1".format(taskId)
stored_command[Constants.AUTO_GENERATED] = True
return stored_command
pass
def updateStateBasedOnCommand(self, commands, createStatus=True):
# A STOP command is paired with the START command to provide agents the
# capability to gracefully stop the app if possible. The STOP command needs
# to be stored since the AM might not be able to provide it since it could
# have lost the container state for whatever reasons. The STOP command has
# no other role to play in the Agent state transition so it is removed from
# the commands list.
index = 0
deleteIndex = 0
delete = False
# break only if an INSTALL command is found, since we might get a STOP
# command for a START command
for command in commands:
if command["roleCommand"] == "START":
self.componentExpectedState = State.STARTED
self.componentActualState = State.STARTING
self.failureCount = 0
if createStatus:
self.statusCommand = self.createStatusCommand(command)
# The STOP command index is stored to be deleted
if command["roleCommand"] == "STOP":
self.stopCommand = command
delete = True
deleteIndex = index
if command["roleCommand"] == "INSTALL":
self.componentExpectedState = State.INSTALLED
self.componentActualState = State.INSTALLING
self.failureCount = 0
break;
index += 1
# Delete the STOP command
if delete:
del commands[deleteIndex]
def updateStateBasedOnResult(self, commandResult):
if len(commandResult) > 0:
if "commandStatus" in commandResult:
if commandResult["commandStatus"] == ActionQueue.COMPLETED_STATUS:
self.componentActualState = self.componentExpectedState
self.logStates()
pass
pass
if commandResult["commandStatus"] == ActionQueue.FAILED_STATUS:
self.componentActualState = State.FAILED
self.failureCount += 1
self.logStates()
pass
if "healthStatus" in commandResult:
if commandResult["healthStatus"] == "INSTALLED":
# Mark it FAILED as its a failure remedied by auto-start or container restart
self.componentActualState = State.FAILED
self.failureCount += 1
self.logStates()
if (commandResult["healthStatus"] == "STARTED") and (self.componentActualState != State.STARTED):
self.componentActualState = State.STARTED
self.failureCount = 0
self.logStates()
pass
pass
pass
def logStates(self):
logger.info("Component states (result): Expected: " + str(self.componentExpectedState) + \
" and Actual: " + str(self.componentActualState))
pass
def createStatusCommand(self, command):
statusCommand = {}
statusCommand["clusterName"] = command["clusterName"]
statusCommand["commandParams"] = command["commandParams"]
statusCommand["commandType"] = "STATUS_COMMAND"
statusCommand["roleCommand"] = "STATUS"
statusCommand["componentName"] = command["role"]
statusCommand["configurations"] = {}
statusCommand["configurations"]["global"] = command["configurations"]["global"]
statusCommand["hostLevelParams"] = command["hostLevelParams"]
statusCommand["serviceName"] = command["serviceName"]
statusCommand["taskId"] = "status"
statusCommand[Constants.AUTO_GENERATED] = True
logger.info("Status command: " + pprint.pformat(statusCommand))
return statusCommand
pass
def run(self):
self.agentToggleLogger = AgentToggleLogger("info")
self.actionQueue = ActionQueue(self.config, controller=self, agentToggleLogger=self.agentToggleLogger)
self.actionQueue.start()
self.register = Register(self.config)
self.heartbeat = Heartbeat(self.actionQueue, self.config, self.agentToggleLogger)
opener = urllib2.build_opener()
urllib2.install_opener(opener)
while True:
self.repeatRegistration = False
self.registerAndHeartbeat()
if not self.repeatRegistration:
break
logger.info("Controller stopped.")
pass
def registerAndHeartbeat(self):
registerResponse = self.registerWithServer()
message = registerResponse['response']
logger.info("Response from server = " + message)
if self.isRegistered:
# Process callbacks
for callback in self.registration_listeners:
callback()
time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
self.heartbeatWithServer()
logger.info("Controller stopped heartbeating.")
def restartAgent(self):
os._exit(AGENT_AUTO_RESTART_EXIT_CODE)
pass
def sendRequest(self, url, data):
response = None
try:
if self.cachedconnect is None: # Lazy initialization
self.cachedconnect = security.CachedHTTPSConnection(self.config)
req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
response = self.cachedconnect.request(req)
return response
except Exception:
exc_type, exc_value, exc_traceback = sys.exc_info()
logger.error("Exception raised", exc_info=(exc_type, exc_value, exc_traceback))
if response is None:
err_msg = 'Request failed! Data: ' + str(data)
logger.warn(err_msg)
return {'exitstatus': 1, 'log': err_msg}
else:
err_msg = ('Response parsing failed! Request data: ' + str(data)
+ '; Response: ' + str(response))
logger.warn(err_msg)
return {'exitstatus': 1, 'log': err_msg}
# Basic window that only counts failures till the window duration expires
def shouldAutoRestart(self):
max, window = self.config.getErrorWindow()
if max <= 0 or window <= 0:
return True
seconds_now = time.time()
if self.autoRestartTrackingSince == 0:
self.autoRestartTrackingSince = seconds_now
self.autoRestartFailures = 1
return True
self.autoRestartFailures += 1
minutes = math.floor((seconds_now - self.autoRestartTrackingSince) / 60)
if self.autoRestartFailures > max:
logger.info("Auto restart not allowed due to " + str(self.autoRestartFailures) + " failures in " + str(minutes) +
" minutes. Max restarts allowed is " + str(max) + " in " + str(window) + " minutes.")
return False
if minutes > window:
logger.info("Resetting window as number of minutes passed is " + str(minutes))
self.autoRestartTrackingSince = seconds_now
self.autoRestartFailures = 1
return True
return True
pass
def main(argv=None):
# Allow Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_DFL)
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
%(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
config = AgentConfig()
collector = Controller(config)
collector.start()
collector.run()
if __name__ == '__main__':
main()