blob: c7d9f3d34448e29c9138f7c965fb1fc1dac61d7b [file] [log] [blame]
#!/usr/bin/env python2.6
'''
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
'''
import logging
import traceback
import logging.handlers
import Queue
import threading
import AmbariConfig
from LiveStatus import LiveStatus
from shell import shellRunner
from FileUtil import writeFile, createStructure, deleteStructure, getFilePath, appendToFile
import json
import pprint
import os
import time
import subprocess
import copy
import puppetExecutor
import tempfile
from Grep import Grep
logger = logging.getLogger()
installScriptHash = -1
class ActionQueue(threading.Thread):
""" Action Queue for the agent. We pick one command at a time from the queue
and execute that """
global commandQueue, resultQueue #, STATUS_COMMAND, EXECUTION_COMMAND
commandQueue = Queue.Queue()
resultQueue = Queue.Queue()
STATUS_COMMAND='STATUS_COMMAND'
EXECUTION_COMMAND='EXECUTION_COMMAND'
IDLE_SLEEP_TIME = 5
def __init__(self, config):
super(ActionQueue, self).__init__()
#threading.Thread.__init__(self)
self.config = config
self.sh = shellRunner()
self._stop = threading.Event()
self.maxRetries = config.getint('command', 'maxretries')
self.sleepInterval = config.getint('command', 'sleepBetweenRetries')
self.executor = puppetExecutor.puppetExecutor(config.get('puppet', 'puppetmodules'),
config.get('puppet', 'puppet_home'),
config.get('puppet', 'facter_home'),
config.get('agent', 'prefix'), config)
self.tmpdir = config.get('agent', 'prefix')
self.commandInProgress = None
def stop(self):
self._stop.set()
def stopped(self):
return self._stop.isSet()
def getshellinstance(self):
""" For Testing purpose only."""
return self.sh
def put(self, command):
logger.info("The " + command['commandType'] + " from the server is \n" + pprint.pformat(command))
commandQueue.put(command)
pass
def getCommandQueue(self):
""" For Testing purpose only."""
return commandQueue
def run(self):
result = []
while not self.stopped():
while not commandQueue.empty():
command = commandQueue.get()
logger.info("Took an element of Queue: " + pprint.pformat(command))
if command['commandType'] == self.EXECUTION_COMMAND:
try:
#pass a copy of action since we don't want anything to change in the
#action dict
result = self.executeCommand(command)
except Exception, err:
traceback.print_exc()
logger.warn(err)
pass
for entry in result:
resultQueue.put((ActionQueue.EXECUTION_COMMAND, entry))
pass
elif command['commandType'] == self.STATUS_COMMAND:
cluster = command['clusterName']
service = command['serviceName']
component = command['componentName']
try:
livestatus = LiveStatus(cluster, service, component)
result = livestatus.build()
logger.info("Got live status for component " + component + " of service " + str(service) +\
" of cluster " + str(cluster) + "\n" + pprint.pformat(result))
if result is not None:
resultQueue.put((ActionQueue.STATUS_COMMAND, result))
except Exception, err:
traceback.print_exc()
logger.warn(err)
pass
else:
logger.warn("Unrecognized command " + pprint.pformat(result))
if not self.stopped():
time.sleep(self.IDLE_SLEEP_TIME)
# Store action result to agent response queue
def result(self):
resultReports = []
resultComponentStatus = []
while not resultQueue.empty():
res = resultQueue.get()
if res[0] == ActionQueue.EXECUTION_COMMAND:
resultReports.append(res[1])
elif res[0] == ActionQueue.STATUS_COMMAND:
resultComponentStatus.append(res[1])
# Building report for command in progress
if self.commandInProgress is not None:
try:
tmpout= open(self.commandInProgress['tmpout'], 'r').read()
tmperr= open(self.commandInProgress['tmperr'], 'r').read()
except Exception, err:
logger.warn(err)
tmpout='...'
tmperr='...'
grep = Grep()
output = grep.tail(tmpout, puppetExecutor.puppetExecutor.OUTPUT_LAST_LINES)
inprogress = {
'role' : self.commandInProgress['role'],
'actionId' : self.commandInProgress['actionId'],
'taskId' : self.commandInProgress['taskId'],
'stdout' : grep.filterMarkup(output),
'clusterName' : self.commandInProgress['clusterName'],
'stderr' : tmperr,
'exitCode' : 777,
'serviceName' : self.commandInProgress['serviceName'],
'status' : 'IN_PROGRESS'
}
resultReports.append(inprogress)
result={
'reports' : resultReports,
'componentStatus' : resultComponentStatus
}
return result
def registerCommand(self, command):
return {}
def statusCommand(self, command):
return {}
def executeCommand(self, command):
logger.info("Executing command \n" + pprint.pformat(command))
clusterName = command['clusterName']
commandId = command['commandId']
hostname = command['hostname']
params = command['hostLevelParams']
clusterHostInfo = command['clusterHostInfo']
roleCommand = command['roleCommand']
serviceName = command['serviceName']
configurations = command['configurations']
result = []
taskId = command['taskId']
# Preparing 'IN_PROGRESS' report
self.commandInProgress = {
'role' : command['role'],
'actionId' : commandId,
'taskId' : taskId,
'clusterName' : clusterName,
'serviceName' : serviceName,
'tmpout': self.tmpdir + os.sep + 'output-' + str(taskId) + '.txt',
'tmperr': self.tmpdir + os.sep + 'errors-' + str(taskId) + '.txt'
}
# running command
commandresult = self.executor.runCommand(command, self.commandInProgress['tmpout'], self.commandInProgress['tmperr'])
# dumping results
self.commandInProgress = None
status = "COMPLETED"
if commandresult['exitcode'] != 0:
status = "FAILED"
# assume some puppet pluing to run these commands
roleResult = {'role' : command['role'],
'actionId' : commandId,
'taskId' : command['taskId'],
'stdout' : commandresult['stdout'],
'clusterName' : clusterName,
'stderr' : commandresult['stderr'],
'exitCode' : commandresult['exitcode'],
'serviceName' : serviceName,
'status' : status}
if roleResult['stdout'] == '':
roleResult['stdout'] = 'None'
if roleResult['stderr'] == '':
roleResult['stderr'] = 'None'
result.append(roleResult)
pass
return result
def noOpCommand(self, command):
result = {'commandId' : command['Id']}
return result
def unknownAction(self, action):
logger.error('Unknown action: %s' % action['id'])
result = { 'id': action['id'] }
return result
def isIdle(self):
return commandQueue.empty()