blob: fe5760d0a41b09d8abf5560135112da64f5a6f31 [file] [log] [blame]
#!/usr/bin/env python
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import signal
import json
import sys
import os
import time
import threading
import urllib2
import pprint
from random import randint
from AgentConfig import AgentConfig
from Heartbeat import Heartbeat
from Register import Register
from ActionQueue import ActionQueue
from NetUtil import NetUtil
import ssl
import ProcessHelper
logger = logging.getLogger()
AGENT_AUTO_RESTART_EXIT_CODE = 77
class State:
INIT, INSTALLING, INSTALLED, STARTING, STARTED, FAILED = range(6)
class Controller(threading.Thread):
def __init__(self, config, range=30):
threading.Thread.__init__(self)
logger.debug('Initializing Controller RPC thread.')
self.lock = threading.Lock()
self.safeMode = True
self.credential = None
self.config = config
self.hostname = config.getLabel()
server_url = 'http://' + config.get(AgentConfig.SERVER_SECTION,
'hostname') + \
':' + config.get(AgentConfig.SERVER_SECTION,
'port')
self.registerUrl = server_url + '/ws/v1/slider/agents/' + self.hostname + '/register'
self.heartbeatUrl = server_url + '/ws/v1/slider/agents/' + self.hostname + '/heartbeat'
self.netutil = NetUtil()
self.responseId = -1
self.repeatRegistration = False
self.isRegistered = False
self.cachedconnect = None
self.range = range
self.hasMappedComponents = True
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
self.heartbeat_wait_event = threading.Event()
# List of callbacks that are called at agent registration
self.registration_listeners = []
self.componentExpectedState = State.INIT
self.componentActualState = State.INIT
self.statusCommand = None
self.failureCount = 0
def __del__(self):
logger.info("Server connection disconnected.")
pass
def registerWithServer(self):
id = -1
ret = {}
while not self.isRegistered:
try:
data = json.dumps(self.register.build(id))
logger.info("Registering with the server " + pprint.pformat(data))
response = self.sendRequest(self.registerUrl, data)
ret = json.loads(response)
exitstatus = 0
# exitstatus is a code of error which was rised on server side.
# exitstatus = 0 (OK - Default)
# exitstatus = 1 (Registration failed because
# different version of agent and server)
if 'exitstatus' in ret.keys():
exitstatus = int(ret['exitstatus'])
# log - message, which will be printed to agents log
if 'log' in ret.keys():
log = ret['log']
if exitstatus == 1:
logger.error(log)
self.isRegistered = False
self.repeatRegistration = False
return ret
logger.info("Registered with the server with " + pprint.pformat(ret))
print("Registered with the server")
self.responseId = int(ret['responseId'])
self.isRegistered = True
if 'statusCommands' in ret.keys():
logger.info("Got status commands on registration " + pprint.pformat(
ret['statusCommands']))
self.addToQueue(ret['statusCommands'])
pass
else:
self.hasMappedComponents = False
pass
except ssl.SSLError:
self.repeatRegistration = False
self.isRegistered = False
return
except Exception, err:
# try a reconnect only after a certain amount of random time
delay = randint(0, self.range)
logger.info("Unable to connect to: " + self.registerUrl, exc_info=True)
""" Sleeping for {0} seconds and then retrying again """.format(delay)
time.sleep(delay)
pass
pass
return ret
def addToQueue(self, commands):
"""Add to the queue for running the commands """
""" Put the required actions into the Queue """
if not commands:
logger.debug("No commands from the server : " + pprint.pformat(commands))
else:
"""Only add to the queue if not empty list """
self.actionQueue.put(commands)
pass
# For testing purposes
DEBUG_HEARTBEAT_RETRIES = 0
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False
MAX_FAILURE_COUNT_TO_STOP = 2
def shouldStopAgent(self):
'''
If component has failed after start then stop the agent
'''
if (self.componentActualState == State.FAILED) \
and (self.componentExpectedState == State.STARTED) \
and (self.failureCount >= Controller.MAX_FAILURE_COUNT_TO_STOP):
return True
else:
return False
pass
def heartbeatWithServer(self):
self.DEBUG_HEARTBEAT_RETRIES = 0
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
retry = False
certVerifFailed = False
id = 0
while not self.DEBUG_STOP_HEARTBEATING:
if self.shouldStopAgent():
logger.info("Component instance has stopped, stopping the agent ...")
ProcessHelper.stopAgent()
commandResult = {}
try:
if not retry:
data = json.dumps(
self.heartbeat.build(commandResult, self.responseId,
self.hasMappedComponents))
self.updateStateBasedOnResult(commandResult)
logger.debug("Sending request: " + data)
pass
else:
self.DEBUG_HEARTBEAT_RETRIES += 1
response = self.sendRequest(self.heartbeatUrl, data)
response = json.loads(response)
logger.debug('Got server response: ' + pprint.pformat(response))
serverId = int(response['responseId'])
if 'hasMappedComponents' in response.keys():
self.hasMappedComponents = response['hasMappedComponents'] != False
if 'registrationCommand' in response.keys():
# check if the registration command is None. If none skip
if response['registrationCommand'] is not None:
logger.info(
"RegistrationCommand received - repeat agent registration")
self.isRegistered = False
self.repeatRegistration = True
return
if serverId != self.responseId + 1:
logger.error("Error in responseId sequence - restarting")
self.restartAgent()
else:
self.responseId = serverId
if 'executionCommands' in response.keys():
self.updateStateBasedOnCommand(response['executionCommands'])
self.addToQueue(response['executionCommands'])
pass
if 'statusCommands' in response.keys() and len(response['statusCommands']) > 0:
self.addToQueue(response['statusCommands'])
pass
if "true" == response['restartAgent']:
logger.error("Got restartAgent command")
self.restartAgent()
else:
logger.info("No commands sent from the Server.")
pass
# Add a status command
if (self.componentActualState != State.STARTING and \
self.componentExpectedState == State.STARTED) and \
not self.statusCommand == None:
self.addToQueue([self.statusCommand])
if retry:
print("Reconnected to the server")
logger.info("Reconnected to the server")
retry = False
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
self.heartbeat_wait_event.clear()
except ssl.SSLError:
self.repeatRegistration = False
self.isRegistered = False
return
except Exception, err:
#randomize the heartbeat
delay = randint(0, self.range)
time.sleep(delay)
if "code" in err:
logger.error(err.code)
else:
logger.error(
"Unable to connect to: " + self.heartbeatUrl + " due to " + str(
err))
logger.debug("Details: " + str(err), exc_info=True)
if not retry:
print("Connection to the server was lost. Reconnecting...")
if 'certificate verify failed' in str(err) and not certVerifFailed:
print(
"Server certificate verify failed. Did you regenerate server certificate?")
certVerifFailed = True
self.cachedconnect = None # Previous connection is broken now
retry = True
# Sleep for some time
timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
- self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
self.heartbeat_wait_event.wait(timeout=timeout)
# Sleep a bit more to allow STATUS_COMMAND results to be collected
# and sent in one heartbeat. Also avoid server overload with heartbeats
time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
pass
logger.info("Controller stopped heart-beating.")
def updateStateBasedOnCommand(self, commands):
for command in commands:
if command["roleCommand"] == "START":
self.componentExpectedState = State.STARTED
self.componentActualState = State.STARTING
self.failureCount = 0
self.statusCommand = self.createStatusCommand(command)
if command["roleCommand"] == "INSTALL":
self.componentExpectedState = State.INSTALLED
self.componentActualState = State.INSTALLING
self.failureCount = 0
break;
def updateStateBasedOnResult(self, commandResult):
if len(commandResult) > 0:
if "commandStatus" in commandResult:
if commandResult["commandStatus"] == ActionQueue.COMPLETED_STATUS:
self.componentActualState = self.componentExpectedState
self.logStates()
pass
pass
if commandResult["commandStatus"] == ActionQueue.FAILED_STATUS:
self.componentActualState = State.FAILED
self.failureCount += 1
self.logStates()
pass
if "healthStatus" in commandResult:
if commandResult["healthStatus"] == "INSTALLED":
self.componentActualState = State.FAILED
self.failureCount += 1
self.logStates()
if (commandResult["healthStatus"] == "STARTED") and (self.componentActualState != State.STARTED):
self.componentActualState = State.STARTED
self.failureCount = 0
self.logStates()
pass
pass
pass
def logStates(self):
logger.info("Component states (result): Expected: " + str(self.componentExpectedState) + \
" and Actual: " + str(self.componentActualState))
pass
def createStatusCommand(self, command):
statusCommand = {}
statusCommand["clusterName"] = command["clusterName"]
statusCommand["commandParams"] = command["commandParams"]
statusCommand["commandType"] = "STATUS_COMMAND"
statusCommand["roleCommand"] = "STATUS"
statusCommand["componentName"] = command["role"]
statusCommand["configurations"] = {}
statusCommand["configurations"]["global"] = command["configurations"]["global"]
statusCommand["hostLevelParams"] = command["hostLevelParams"]
statusCommand["serviceName"] = command["serviceName"]
statusCommand["taskId"] = "status"
statusCommand['auto_generated'] = True
return statusCommand
logger.info("Status command: " + pprint.pformat(statusCommand))
pass
def run(self):
self.actionQueue = ActionQueue(self.config, controller=self)
self.actionQueue.start()
self.register = Register(self.config)
self.heartbeat = Heartbeat(self.actionQueue, self.config)
opener = urllib2.build_opener()
urllib2.install_opener(opener)
while True:
self.repeatRegistration = False
self.registerAndHeartbeat()
if not self.repeatRegistration:
break
logger.info("Controller stopped.")
pass
def registerAndHeartbeat(self):
registerResponse = self.registerWithServer()
message = registerResponse['response']
logger.info("Response from server = " + message)
if self.isRegistered:
# Process callbacks
for callback in self.registration_listeners:
callback()
time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
self.heartbeatWithServer()
logger.info("Controller stopped heartbeating.")
def restartAgent(self):
os._exit(AGENT_AUTO_RESTART_EXIT_CODE)
pass
def sendRequest(self, url, data):
req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
f = urllib2.urlopen(req)
response = f.read()
f.close()
return response
def main(argv=None):
# Allow Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_DFL)
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
%(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
config = AgentConfig()
collector = Controller(config)
collector.start()
collector.run()
if __name__ == '__main__':
main()