blob: 30f9a4d511180204e0894f8348eca429225e094c [file] [log] [blame]
#!/usr/bin/env python2.6
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import signal
import json
import sys
import os
import time
import threading
import urllib2
import pprint
from random import randint
import hostname
import AmbariConfig
import ProcessHelper
from Heartbeat import Heartbeat
from Register import Register
from ActionQueue import ActionQueue
import security
from NetUtil import NetUtil
import ssl
logger = logging.getLogger()
AGENT_AUTO_RESTART_EXIT_CODE = 77
class Controller(threading.Thread):
def __init__(self, config, range=30):
threading.Thread.__init__(self)
logger.debug('Initializing Controller RPC thread.')
self.lock = threading.Lock()
self.safeMode = True
self.credential = None
self.config = config
self.hostname = hostname.hostname()
server_secured_url = 'https://' + config.get('server', 'hostname') + ':' + config.get('server', 'secured_url_port')
self.registerUrl = server_secured_url + '/agent/v1/register/' + self.hostname
self.heartbeatUrl = server_secured_url + '/agent/v1/heartbeat/' + self.hostname
self.netutil = NetUtil()
self.responseId = -1
self.repeatRegistration = False
self.isRegistered = False
self.cachedconnect = None
self.range = range
self.hasMappedComponents = True
# Event is used for synchronizing heartbeat iterations (to make possible
# manual wait() interruption between heartbeats )
self.heartbeat_wait_event = threading.Event()
def __del__(self):
logger.info("Server connection disconnected.")
pass
def registerWithServer(self):
retry=False
firstTime=True
id = -1
ret = {}
while not self.isRegistered:
try:
data = json.dumps(self.register.build(id))
logger.info("Registering with the server " + pprint.pformat(data))
response = self.sendRequest(self.registerUrl, data)
ret = json.loads(response)
exitstatus = 0
# exitstatus is a code of error which was rised on server side.
# exitstatus = 0 (OK - Default)
# exitstatus = 1 (Registration failed because
# different version of agent and server)
if 'exitstatus' in ret.keys():
exitstatus = int(ret['exitstatus'])
# log - message, which will be printed to agents log
if 'log' in ret.keys():
log = ret['log']
if exitstatus == 1:
logger.error(log)
self.isRegistered = False
self.repeatRegistration=False
return ret
logger.info("Registered with the server with " + pprint.pformat(ret))
print("Registered with the server")
self.responseId= int(ret['responseId'])
self.isRegistered = True
if 'statusCommands' in ret.keys():
logger.info("Got status commands on registration " + pprint.pformat(ret['statusCommands']) )
self.addToQueue(ret['statusCommands'])
pass
else:
self.hasMappedComponents = False
pass
except ssl.SSLError:
self.repeatRegistration=False
self.isRegistered = False
return
except Exception, err:
# try a reconnect only after a certain amount of random time
delay = randint(0, self.range)
logger.info("Unable to connect to: " + self.registerUrl, exc_info = True)
""" Sleeping for {0} seconds and then retrying again """.format(delay)
time.sleep(delay)
pass
pass
return ret
def addToQueue(self, commands):
"""Add to the queue for running the commands """
""" Put the required actions into the Queue """
""" Verify if the action is to reboot or not """
if not commands:
logger.debug("No commands from the server : " + pprint.pformat(commands))
else:
"""Only add to the queue if not empty list """
self.actionQueue.put(commands)
pass
# For testing purposes
DEBUG_HEARTBEAT_RETRIES = 0
DEBUG_SUCCESSFULL_HEARTBEATS = 0
DEBUG_STOP_HEARTBEATING = False
def heartbeatWithServer(self):
self.DEBUG_HEARTBEAT_RETRIES = 0
self.DEBUG_SUCCESSFULL_HEARTBEATS = 0
retry = False
certVerifFailed = False
config = AmbariConfig.config
hb_interval = config.get('heartbeat', 'state_interval')
#TODO make sure the response id is monotonically increasing
id = 0
while not self.DEBUG_STOP_HEARTBEATING:
try:
if not retry:
data = json.dumps(
self.heartbeat.build(self.responseId, int(hb_interval), self.hasMappedComponents))
logger.debug("Sending request: " + data)
pass
else:
self.DEBUG_HEARTBEAT_RETRIES += 1
response = self.sendRequest(self.heartbeatUrl, data)
response = json.loads(response)
logger.debug('Got server response: ' + pprint.pformat(response))
serverId=int(response['responseId'])
if 'hasMappedComponents' in response.keys():
self.hasMappedComponents = response['hasMappedComponents'] != False
if 'registrationCommand' in response.keys():
# check if the registration command is None. If none skip
if response['registrationCommand'] is not None:
logger.info("RegistrationCommand received - repeat agent registration")
self.isRegistered = False
self.repeatRegistration = True
return
if serverId!=self.responseId+1:
logger.error("Error in responseId sequence - restarting")
self.restartAgent()
else:
self.responseId=serverId
if 'executionCommands' in response.keys():
self.addToQueue(response['executionCommands'])
pass
if 'statusCommands' in response.keys():
self.addToQueue(response['statusCommands'])
pass
if "true" == response['restartAgent']:
logger.error("Got restartAgent command")
self.restartAgent()
else:
logger.info("No commands sent from the Server.")
pass
if retry:
print("Reconnected to the server")
logger.info("Reconnected to the server")
retry=False
certVerifFailed = False
self.DEBUG_SUCCESSFULL_HEARTBEATS += 1
self.DEBUG_HEARTBEAT_RETRIES = 0
self.heartbeat_wait_event.clear()
except ssl.SSLError:
self.repeatRegistration=False
self.isRegistered = False
return
except Exception, err:
#randomize the heartbeat
delay = randint(0, self.range)
time.sleep(delay)
if "code" in err:
logger.error(err.code)
else:
logger.error("Unable to connect to: " + self.heartbeatUrl + " due to " + str(err))
logger.debug("Details: " + str(err), exc_info=True)
if not retry:
print("Connection to the server was lost. Reconnecting...")
if 'certificate verify failed' in str(err) and not certVerifFailed:
print("Server certificate verify failed. Did you regenerate server certificate?")
certVerifFailed = True
self.cachedconnect = None # Previous connection is broken now
retry=True
# Sleep for some time
timeout = self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC \
- self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS
self.heartbeat_wait_event.wait(timeout = timeout)
# Sleep a bit more to allow STATUS_COMMAND results to be collected
# and sent in one heartbeat. Also avoid server overload with heartbeats
time.sleep(self.netutil.MINIMUM_INTERVAL_BETWEEN_HEARTBEATS)
pass
def run(self):
self.actionQueue = ActionQueue(self.config, controller=self)
self.actionQueue.start()
self.register = Register(self.config)
self.heartbeat = Heartbeat(self.actionQueue, self.config)
opener = urllib2.build_opener()
urllib2.install_opener(opener)
while True:
self.repeatRegistration = False
self.registerAndHeartbeat()
if not self.repeatRegistration:
break
pass
def registerAndHeartbeat(self):
registerResponse = self.registerWithServer()
message = registerResponse['response']
logger.info("Response from server = " + message)
if self.isRegistered:
time.sleep(self.netutil.HEARTBEAT_IDDLE_INTERVAL_SEC)
self.heartbeatWithServer()
def restartAgent(self):
os._exit(AGENT_AUTO_RESTART_EXIT_CODE)
pass
def sendRequest(self, url, data):
if self.cachedconnect is None: # Lazy initialization
self.cachedconnect = security.CachedHTTPSConnection(self.config)
req = urllib2.Request(url, data, {'Content-Type': 'application/json'})
response = self.cachedconnect.request(req)
return response
def main(argv=None):
# Allow Ctrl-C
signal.signal(signal.SIGINT, signal.SIG_DFL)
logger.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s %(filename)s:%(lineno)d - \
%(message)s")
stream_handler = logging.StreamHandler()
stream_handler.setFormatter(formatter)
logger.addHandler(stream_handler)
logger.info('Starting Server RPC Thread: %s' % ' '.join(sys.argv))
config = AmbariConfig.config
collector = Controller(config)
collector.start()
collector.run()
if __name__ == '__main__':
main()