| #!/usr/bin/env python2.6 |
| |
| ''' |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| ''' |
| |
| import logging |
| import logging.handlers |
| import signal |
| import json |
| import hostname |
| import sys, traceback |
| import time |
| import threading |
| import urllib2 |
| from urllib2 import Request, urlopen, URLError |
| import httplib |
| import ssl |
| import AmbariConfig |
| import pprint |
| import ProcessHelper |
| from Heartbeat import Heartbeat |
| from Register import Register |
| from ActionQueue import ActionQueue |
| from optparse import OptionParser |
| from wsgiref.simple_server import ServerHandler |
| import security |
| from NetUtil import NetUtil |
| from random import randrange, randint |
| |
| logger = logging.getLogger() |
| |
| class Controller(threading.Thread): |
| |
| def __init__(self, config, range=120): |
| threading.Thread.__init__(self) |
| logger.debug('Initializing Controller RPC thread.') |
| self.lock = threading.Lock() |
| self.safeMode = True |
| self.credential = None |
| self.config = config |
| self.hostname = hostname.hostname() |
| server_secured_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'secured_url_port') |
| self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname |
| self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname |
| self.netutil = NetUtil() |
| self.responseId = -1 |
| self.repeatRegistration = False |
| self.cachedconnect = None |
| self.range = range |
| |
| def start(self): |
| self.actionQueue = ActionQueue(self.config) |
| self.actionQueue.start() |
| self.register = Register() |
| self.heartbeat = Heartbeat(self.actionQueue) |
| pass |
| |
| def __del__(self): |
| logger.info("Server connection disconnected.") |
| pass |
| |
| def registerWithServer(self): |
| retry=False |
| firstTime=True |
| registered=False |
| id = -1 |
| ret = {} |
| |
| while not registered: |
| try: |
| data = json.dumps(self.register.build(id)) |
| logger.info("Registering with the server " + pprint.pformat(data)) |
| response = self.sendRequest(self.registerUrl, data) |
| ret = json.loads(response) |
| |
| logger.info("Registered with the server with " + pprint.pformat(ret)) |
| print("Registered with the server") |
| self.responseId= int(ret['responseId']) |
| registered = True |
| if 'statusCommands' in ret.keys(): |
| logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) ) |
| self.addToQueue(ret['statusCommands']) |
| pass |
| pass |
| except Exception, err: |
| # try a reconnect only after a certain amount of random time |
| delay = randint(0, self.range) |
| logger.info("Unable to connect to: " + self.registerUrl, exc_info = True) |
| """ Sleeping for {0} seconds and then retrying again """.format(delay) |
| time.sleep(delay) |
| pass |
| pass |
| return ret |
| |
| |
| def addToQueue(self, commands): |
| """Add to the queue for running the commands """ |
| """ Put the required actions into the Queue """ |
| """ Verify if the action is to reboot or not """ |
| if not commands: |
| logger.info("No commands from the server : " + pprint.pformat(commands)) |
| else: |
| """Only add to the queue if not empty list """ |
| for command in commands: |
| logger.info("Adding command to the action queue: \n" + |
| pprint.pformat(command)) |
| self.actionQueue.put(command) |
| pass |
| pass |
| pass |
| |
| # For testing purposes |
| DEBUG_HEARTBEAT_RETRIES = 0 |
| DEBUG_SUCCESSFULL_HEARTBEATS = 0 |
| DEBUG_STOP_HEARTBITTING = False |
| |
| def heartbeatWithServer(self): |
| self.DEBUG_HEARTBEAT_RETRIES = 0 |
| self.DEBUG_SUCCESSFULL_HEARTBEATS = 0 |
| retry = False |
| certVerifFailed = False |
| |
| config = AmbariConfig.config |
| hb_interval = config.get('heartbeat', 'state_interval') |
| |
| #TODO make sure the response id is monotonically increasing |
| id = 0 |
| while not self.DEBUG_STOP_HEARTBITTING: |
| try: |
| if not retry: |
| data = json.dumps(self.heartbeat.build(self.responseId, int(hb_interval))) |
| pass |
| else: |
| self.DEBUG_HEARTBEAT_RETRIES += 1 |
| response = self.sendRequest(self.heartbeatUrl, data) |
| response = json.loads(response) |
| |
| logger.info('Got server response: ' + pprint.pformat(response)) |
| |
| serverId=int(response['responseId']) |
| |
| if 'registrationCommand' in response.keys(): |
| # check if the registration command is None. If none skip |
| if response['registrationCommand'] is not None: |
| logger.info("RegistrationCommand received - repeat agent registration") |
| self.repeatRegistration = True |
| return |
| |
| if serverId!=self.responseId+1: |
| logger.error("Error in responseId sequence - restarting") |
| self.restartAgent() |
| else: |
| self.responseId=serverId |
| |
| if 'executionCommands' in response.keys(): |
| self.addToQueue(response['executionCommands']) |
| pass |
| if 'statusCommands' in response.keys(): |
| self.addToQueue(response['statusCommands']) |
| pass |
| if "true" == response['restartAgent']: |
| logger.error("Got restartAgent command") |
| self.restartAgent() |
| else: |
| logger.info("No commands sent from the Server.") |
| pass |
| |
| if retry: |
| print("Reconnected to the server") |
| logger.info("Reconnected to the server") |
| retry=False |
| certVerifFailed = False |
| self.DEBUG_SUCCESSFULL_HEARTBEATS += 1 |
| self.DEBUG_HEARTBEAT_RETRIES = 0 |
| except Exception, err: |
| #randomize the heartbeat |
| delay = randint(0, self.range) |
| time.sleep(delay) |
| if "code" in err: |
| logger.error(err.code) |
| else: |
| logger.error("Unable to connect to: " + self.heartbeatUrl + " due to " + str(err)) |
| logger.debug("Details: " + str(err), exc_info=True) |
| if not retry: |
| print("Connection to the server was lost. Reconnecting...") |
| if 'certificate verify failed' in str(err) and not certVerifFailed: |
| print("Server certificate verify failed. Did you regenerate server certificate?") |
| certVerifFailed = True |
| self.cachedconnect = None # Previous connection is broken now |
| retry=True |
| if self.actionQueue.isIdle(): |
| time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC) |
| else: |
| time.sleep(self.netutil.HEARTBEAT_NOT_IDDLE_INTERVAL_SEC) |
| pass |
| |
| def run(self): |
| opener = urllib2.build_opener() |
| urllib2.install_opener(opener) |
| |
| while True: |
| self.repeatRegistration = False |
| self.registerAndHeartbeat() |
| if not self.repeatRegistration: |
| break |
| |
| pass |
| |
| def registerAndHeartbeat(self): |
| registerResponse = self.registerWithServer() |
| message = registerResponse['response'] |
| logger.info("Response from server = " + message) |
| time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC) |
| self.heartbeatWithServer() |
| |
| def restartAgent(self): |
| ProcessHelper.restartAgent() |
| pass |
| |
| def sendRequest(self, url, data): |
| if self.cachedconnect is None: # Lazy initialization |
| self.cachedconnect = security.CachedHTTPSConnection(self.config) |
| req = urllib2.Request(url, data, {'Content-Type': 'application/json'}) |
| response = self.cachedconnect.request(req) |
| return response |
| |
| def main(argv=None): |
| # Allow Ctrl-C |
| signal.signal(signal.SIGINT, signal.SIG_DFL) |
| |
| logger.setLevel(logging.INFO) |
| formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \ |
| %(message)s") |
| stream_handler = logging.StreamHandler() |
| stream_handler.setFormatter(formatter) |
| logger.addHandler(stream_handler) |
| |
| logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv)) |
| |
| config = AmbariConfig.config |
| collector = Controller(config) |
| collector.start() |
| collector.run() |
| |
| if __name__ == '__main__': |
| main() |